HADOOP-14028. S3A BlockOutputStreams doesn't delete temporary files in multipart uploads or handle part upload failures.

Contributed by Steve Loughran.

(cherry picked from commit 29fe5af017)
This commit is contained in:
Steve Loughran 2017-02-25 15:35:19 +00:00
parent 120bef7de8
commit dab00da19f
No known key found for this signature in database
GPG Key ID: 950CC3E032B79CA2
11 changed files with 653 additions and 280 deletions

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.Progressable;
@ -178,10 +176,10 @@ class S3ABlockOutputStream extends OutputStream {
if (activeBlock == null) {
blockCount++;
if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
LOG.error("Number of partitions in stream exceeds limit for S3: " +
LOG.error("Number of partitions in stream exceeds limit for S3: "
+ Constants.MAX_MULTIPART_COUNT + " write may fail.");
}
activeBlock = blockFactory.create(this.blockSize);
activeBlock = blockFactory.create(blockCount, this.blockSize, statistics);
}
return activeBlock;
}
@ -206,7 +204,9 @@ class S3ABlockOutputStream extends OutputStream {
* Clear the active block.
*/
private void clearActiveBlock() {
LOG.debug("Clearing active block");
if (activeBlock != null) {
LOG.debug("Clearing active block");
}
synchronized (this) {
activeBlock = null;
}
@ -356,11 +356,9 @@ class S3ABlockOutputStream extends OutputStream {
writeOperationHelper.writeFailed(ioe);
throw ioe;
} finally {
LOG.debug("Closing block and factory");
IOUtils.closeStream(block);
IOUtils.closeStream(blockFactory);
closeAll(LOG, block, blockFactory);
LOG.debug("Statistics: {}", statistics);
IOUtils.closeStream(statistics);
closeAll(LOG, statistics);
clearActiveBlock();
}
// All end of write operations, including deleting fake parent directories
@ -378,10 +376,10 @@ class S3ABlockOutputStream extends OutputStream {
final S3ADataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize();
final PutObjectRequest putObjectRequest =
writeOperationHelper.newPutRequest(
block.startUpload(),
size);
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.newPutRequest(uploadData.getFile())
: writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
fs.setOptionalPutRequestParameters(putObjectRequest);
long transferQueueTime = now();
BlockUploadProgress callback =
@ -393,8 +391,14 @@ class S3ABlockOutputStream extends OutputStream {
executorService.submit(new Callable<PutObjectResult>() {
@Override
public PutObjectResult call() throws Exception {
PutObjectResult result = fs.putObjectDirect(putObjectRequest);
block.close();
PutObjectResult result;
try {
// the putObject call automatically closes the input
// stream afterwards.
result = writeOperationHelper.putObject(putObjectRequest);
} finally {
closeAll(LOG, uploadData, block);
}
return result;
}
});
@ -437,6 +441,14 @@ class S3ABlockOutputStream extends OutputStream {
return System.currentTimeMillis();
}
/**
* Get the statistics for this stream.
* @return stream statistics
*/
S3AInstrumentation.OutputStreamStatistics getStatistics() {
return statistics;
}
/**
* Multiple partition upload.
*/
@ -444,7 +456,7 @@ class S3ABlockOutputStream extends OutputStream {
private final String uploadId;
private final List<ListenableFuture<PartETag>> partETagsFutures;
public MultiPartUpload() throws IOException {
MultiPartUpload() throws IOException {
this.uploadId = writeOperationHelper.initiateMultiPartUpload();
this.partETagsFutures = new ArrayList<>(2);
LOG.debug("Initiated multi-part upload for {} with " +
@ -461,14 +473,16 @@ class S3ABlockOutputStream extends OutputStream {
throws IOException {
LOG.debug("Queueing upload of {}", block);
final int size = block.dataSize();
final InputStream uploadStream = block.startUpload();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request =
writeOperationHelper.newUploadPartRequest(
uploadId,
uploadStream,
currentPartNumber,
size);
size,
uploadData.getUploadStream(),
uploadData.getFile());
long transferQueueTime = now();
BlockUploadProgress callback =
new BlockUploadProgress(
@ -483,12 +497,16 @@ class S3ABlockOutputStream extends OutputStream {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId);
// do the upload
PartETag partETag = fs.uploadPart(request).getPartETag();
LOG.debug("Completed upload of {}", block);
LOG.debug("Stream statistics of {}", statistics);
// close the block
block.close();
PartETag partETag;
try {
partETag = fs.uploadPart(request).getPartETag();
LOG.debug("Completed upload of {} to part {}", block,
partETag.getETag());
LOG.debug("Stream statistics of {}", statistics);
} finally {
// close the stream and block
closeAll(LOG, uploadData, block);
}
return partETag;
}
});

View File

@ -24,10 +24,8 @@ import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -42,10 +40,11 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.util.DirectBufferPool;
import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll;
/**
* Set of classes to support output streaming into blocks which are then
* uploaded as partitions.
* uploaded as to S3 as a single PUT, or as part of a multipart request.
*/
final class S3ADataBlocks {
@ -96,6 +95,70 @@ final class S3ADataBlocks {
}
}
/**
* The output information for an upload.
* It can be one of a file or an input stream.
* When closed, any stream is closed. Any source file is untouched.
*/
static final class BlockUploadData implements Closeable {
private final File file;
private final InputStream uploadStream;
/**
* File constructor; input stream will be null.
* @param file file to upload
*/
BlockUploadData(File file) {
Preconditions.checkArgument(file.exists(), "No file: " + file);
this.file = file;
this.uploadStream = null;
}
/**
* Stream constructor, file field will be null.
* @param uploadStream stream to upload
*/
BlockUploadData(InputStream uploadStream) {
Preconditions.checkNotNull(uploadStream, "rawUploadStream");
this.uploadStream = uploadStream;
this.file = null;
}
/**
* Predicate: does this instance contain a file reference.
* @return true if there is a file.
*/
boolean hasFile() {
return file != null;
}
/**
* Get the file, if there is one.
* @return the file for uploading, or null.
*/
File getFile() {
return file;
}
/**
* Get the raw upload stream, if the object was
* created with one.
* @return the upload stream or null.
*/
InputStream getUploadStream() {
return uploadStream;
}
/**
* Close: closes any upload stream provided in the constructor.
* @throws IOException inherited exception
*/
@Override
public void close() throws IOException {
closeAll(LOG, uploadStream);
}
}
/**
* Base class for block factories.
*/
@ -110,15 +173,21 @@ final class S3ADataBlocks {
/**
* Create a block.
*
* @param index index of block
* @param limit limit of the block.
* @param statistics stats to work with
* @return a new block.
*/
abstract DataBlock create(int limit) throws IOException;
abstract DataBlock create(long index, int limit,
S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException;
/**
* Implement any close/cleanup operation.
* Base class is a no-op
* @throws IOException -ideally, it shouldn't.
* @throws IOException Inherited exception; implementations should
* avoid raising it.
*/
@Override
public void close() throws IOException {
@ -140,6 +209,14 @@ final class S3ADataBlocks {
enum DestState {Writing, Upload, Closed}
private volatile DestState state = Writing;
protected final long index;
protected final S3AInstrumentation.OutputStreamStatistics statistics;
protected DataBlock(long index,
S3AInstrumentation.OutputStreamStatistics statistics) {
this.index = index;
this.statistics = statistics;
}
/**
* Atomically enter a state, verifying current state.
@ -243,8 +320,8 @@ final class S3ADataBlocks {
* @return the stream
* @throws IOException trouble
*/
InputStream startUpload() throws IOException {
LOG.debug("Start datablock upload");
BlockUploadData startUpload() throws IOException {
LOG.debug("Start datablock[{}] upload", index);
enterState(Writing, Upload);
return null;
}
@ -278,6 +355,23 @@ final class S3ADataBlocks {
}
/**
* A block has been allocated.
*/
protected void blockAllocated() {
if (statistics != null) {
statistics.blockAllocated();
}
}
/**
* A block has been released.
*/
protected void blockReleased() {
if (statistics != null) {
statistics.blockReleased();
}
}
}
// ====================================================================
@ -292,8 +386,10 @@ final class S3ADataBlocks {
}
@Override
DataBlock create(int limit) throws IOException {
return new ByteArrayBlock(limit);
DataBlock create(long index, int limit,
S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException {
return new ByteArrayBlock(0, limit, statistics);
}
}
@ -334,9 +430,13 @@ final class S3ADataBlocks {
// cache data size so that it is consistent after the buffer is reset.
private Integer dataSize;
ByteArrayBlock(int limit) {
ByteArrayBlock(long index,
int limit,
S3AInstrumentation.OutputStreamStatistics statistics) {
super(index, statistics);
this.limit = limit;
buffer = new S3AByteArrayOutputStream(limit);
blockAllocated();
}
/**
@ -349,12 +449,12 @@ final class S3ADataBlocks {
}
@Override
InputStream startUpload() throws IOException {
BlockUploadData startUpload() throws IOException {
super.startUpload();
dataSize = buffer.size();
ByteArrayInputStream bufferData = buffer.getInputStream();
buffer = null;
return bufferData;
return new BlockUploadData(bufferData);
}
@Override
@ -378,12 +478,14 @@ final class S3ADataBlocks {
@Override
protected void innerClose() {
buffer = null;
blockReleased();
}
@Override
public String toString() {
return "ByteArrayBlock{" +
"state=" + getState() +
return "ByteArrayBlock{"
+"index=" + index +
", state=" + getState() +
", limit=" + limit +
", dataSize=" + dataSize +
'}';
@ -395,12 +497,6 @@ final class S3ADataBlocks {
/**
* Stream via Direct ByteBuffers; these are allocated off heap
* via {@link DirectBufferPool}.
* This is actually the most complex of all the block factories,
* due to the need to explicitly recycle buffers; in comparison, the
* {@link DiskBlock} buffer delegates the work of deleting files to
* the {@link DiskBlock.FileDeletingInputStream}. Here the
* input stream {@link ByteBufferInputStream} has a similar task, along
* with the foundational work of streaming data from a byte array.
*/
static class ByteBufferBlockFactory extends BlockFactory {
@ -413,8 +509,10 @@ final class S3ADataBlocks {
}
@Override
ByteBufferBlock create(int limit) throws IOException {
return new ByteBufferBlock(limit);
ByteBufferBlock create(long index, int limit,
S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException {
return new ByteBufferBlock(index, limit, statistics);
}
private ByteBuffer requestBuffer(int limit) {
@ -446,21 +544,27 @@ final class S3ADataBlocks {
/**
* A DataBlock which requests a buffer from pool on creation; returns
* it when the output stream is closed.
* it when it is closed.
*/
class ByteBufferBlock extends DataBlock {
private ByteBuffer buffer;
private ByteBuffer blockBuffer;
private final int bufferSize;
// cache data size so that it is consistent after the buffer is reset.
private Integer dataSize;
/**
* Instantiate. This will request a ByteBuffer of the desired size.
* @param index block index
* @param bufferSize buffer size
* @param statistics statistics to update
*/
ByteBufferBlock(int bufferSize) {
ByteBufferBlock(long index,
int bufferSize,
S3AInstrumentation.OutputStreamStatistics statistics) {
super(index, statistics);
this.bufferSize = bufferSize;
buffer = requestBuffer(bufferSize);
blockBuffer = requestBuffer(bufferSize);
blockAllocated();
}
/**
@ -473,13 +577,14 @@ final class S3ADataBlocks {
}
@Override
ByteBufferInputStream startUpload() throws IOException {
BlockUploadData startUpload() throws IOException {
super.startUpload();
dataSize = bufferCapacityUsed();
// set the buffer up from reading from the beginning
buffer.limit(buffer.position());
buffer.position(0);
return new ByteBufferInputStream(dataSize, buffer);
blockBuffer.limit(blockBuffer.position());
blockBuffer.position(0);
return new BlockUploadData(
new ByteBufferInputStream(dataSize, blockBuffer));
}
@Override
@ -489,182 +594,190 @@ final class S3ADataBlocks {
@Override
public int remainingCapacity() {
return buffer != null ? buffer.remaining() : 0;
return blockBuffer != null ? blockBuffer.remaining() : 0;
}
private int bufferCapacityUsed() {
return buffer.capacity() - buffer.remaining();
return blockBuffer.capacity() - blockBuffer.remaining();
}
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
buffer.put(b, offset, written);
blockBuffer.put(b, offset, written);
return written;
}
/**
* Closing the block will release the buffer.
*/
@Override
protected void innerClose() {
buffer = null;
if (blockBuffer != null) {
blockReleased();
releaseBuffer(blockBuffer);
blockBuffer = null;
}
}
@Override
public String toString() {
return "ByteBufferBlock{"
+ "state=" + getState() +
+ "index=" + index +
", state=" + getState() +
", dataSize=" + dataSize() +
", limit=" + bufferSize +
", remainingCapacity=" + remainingCapacity() +
'}';
}
}
/**
* Provide an input stream from a byte buffer; supporting
* {@link #mark(int)}, which is required to enable replay of failed
* PUT attempts.
* This input stream returns the buffer to the pool afterwards.
*/
class ByteBufferInputStream extends InputStream {
private final int size;
private ByteBuffer byteBuffer;
ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
LOG.debug("Creating ByteBufferInputStream of size {}", size);
this.size = size;
this.byteBuffer = byteBuffer;
}
/**
* Return the buffer to the pool after the stream is closed.
* Provide an input stream from a byte buffer; supporting
* {@link #mark(int)}, which is required to enable replay of failed
* PUT attempts.
*/
@Override
public synchronized void close() {
if (byteBuffer != null) {
LOG.debug("releasing buffer");
releaseBuffer(byteBuffer);
class ByteBufferInputStream extends InputStream {
private final int size;
private ByteBuffer byteBuffer;
ByteBufferInputStream(int size,
ByteBuffer byteBuffer) {
LOG.debug("Creating ByteBufferInputStream of size {}", size);
this.size = size;
this.byteBuffer = byteBuffer;
}
/**
* After the stream is closed, set the local reference to the byte
* buffer to null; this guarantees that future attempts to use
* stream methods will fail.
*/
@Override
public synchronized void close() {
LOG.debug("ByteBufferInputStream.close() for {}",
ByteBufferBlock.super.toString());
byteBuffer = null;
}
}
/**
* Verify that the stream is open.
* @throws IOException if the stream is closed
*/
private void verifyOpen() throws IOException {
if (byteBuffer == null) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
public synchronized int read() throws IOException {
if (available() > 0) {
return byteBuffer.get() & 0xFF;
} else {
return -1;
}
}
@Override
public synchronized long skip(long offset) throws IOException {
verifyOpen();
long newPos = position() + offset;
if (newPos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (newPos > size) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
byteBuffer.position((int) newPos);
return newPos;
}
@Override
public synchronized int available() {
Preconditions.checkState(byteBuffer != null,
FSExceptionMessages.STREAM_IS_CLOSED);
return byteBuffer.remaining();
}
/**
* Get the current buffer position.
* @return the buffer position
*/
public synchronized int position() {
return byteBuffer.position();
}
/**
* Check if there is data left.
* @return true if there is data remaining in the buffer.
*/
public synchronized boolean hasRemaining() {
return byteBuffer.hasRemaining();
}
@Override
public synchronized void mark(int readlimit) {
LOG.debug("mark at {}", position());
byteBuffer.mark();
}
@Override
public synchronized void reset() throws IOException {
LOG.debug("reset");
byteBuffer.reset();
}
@Override
public boolean markSupported() {
return true;
}
/**
* Read in data.
* @param buffer destination buffer
* @param offset offset within the buffer
* @param length length of bytes to read
* @throws EOFException if the position is negative
* @throws IndexOutOfBoundsException if there isn't space for the
* amount of data requested.
* @throws IllegalArgumentException other arguments are invalid.
*/
@SuppressWarnings("NullableProblems")
public synchronized int read(byte[] buffer, int offset, int length)
throws IOException {
Preconditions.checkArgument(length >= 0, "length is negative");
Preconditions.checkArgument(buffer != null, "Null buffer");
if (buffer.length - offset < length) {
throw new IndexOutOfBoundsException(
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ ": request length =" + length
+ ", with offset =" + offset
+ "; buffer capacity =" + (buffer.length - offset));
}
verifyOpen();
if (!hasRemaining()) {
return -1;
/**
* Verify that the stream is open.
* @throws IOException if the stream is closed
*/
private void verifyOpen() throws IOException {
if (byteBuffer == null) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
}
int toRead = Math.min(length, available());
byteBuffer.get(buffer, offset, toRead);
return toRead;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"ByteBufferInputStream{");
sb.append("size=").append(size);
ByteBuffer buffer = this.byteBuffer;
if (buffer != null) {
sb.append(", available=").append(buffer.remaining());
public synchronized int read() throws IOException {
if (available() > 0) {
return byteBuffer.get() & 0xFF;
} else {
return -1;
}
}
@Override
public synchronized long skip(long offset) throws IOException {
verifyOpen();
long newPos = position() + offset;
if (newPos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
if (newPos > size) {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
byteBuffer.position((int) newPos);
return newPos;
}
@Override
public synchronized int available() {
Preconditions.checkState(byteBuffer != null,
FSExceptionMessages.STREAM_IS_CLOSED);
return byteBuffer.remaining();
}
/**
* Get the current buffer position.
* @return the buffer position
*/
public synchronized int position() {
return byteBuffer.position();
}
/**
* Check if there is data left.
* @return true if there is data remaining in the buffer.
*/
public synchronized boolean hasRemaining() {
return byteBuffer.hasRemaining();
}
@Override
public synchronized void mark(int readlimit) {
LOG.debug("mark at {}", position());
byteBuffer.mark();
}
@Override
public synchronized void reset() throws IOException {
LOG.debug("reset");
byteBuffer.reset();
}
@Override
public boolean markSupported() {
return true;
}
/**
* Read in data.
* @param b destination buffer
* @param offset offset within the buffer
* @param length length of bytes to read
* @throws EOFException if the position is negative
* @throws IndexOutOfBoundsException if there isn't space for the
* amount of data requested.
* @throws IllegalArgumentException other arguments are invalid.
*/
@SuppressWarnings("NullableProblems")
public synchronized int read(byte[] b, int offset, int length)
throws IOException {
Preconditions.checkArgument(length >= 0, "length is negative");
Preconditions.checkArgument(b != null, "Null buffer");
if (b.length - offset < length) {
throw new IndexOutOfBoundsException(
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ ": request length =" + length
+ ", with offset =" + offset
+ "; buffer capacity =" + (b.length - offset));
}
verifyOpen();
if (!hasRemaining()) {
return -1;
}
int toRead = Math.min(length, available());
byteBuffer.get(b, offset, toRead);
return toRead;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"ByteBufferInputStream{");
sb.append("size=").append(size);
ByteBuffer buf = this.byteBuffer;
if (buf != null) {
sb.append(", available=").append(buf.remaining());
}
sb.append(", ").append(ByteBufferBlock.super.toString());
sb.append('}');
return sb.toString();
}
sb.append('}');
return sb.toString();
}
}
}
@ -681,22 +794,29 @@ final class S3ADataBlocks {
}
/**
* Create a temp file and a block which writes to it.
* Create a temp file and a {@link DiskBlock} instance to manage it.
*
* @param index block index
* @param limit limit of the block.
* @param statistics statistics to update
* @return the new block
* @throws IOException IO problems
*/
@Override
DataBlock create(int limit) throws IOException {
DataBlock create(long index,
int limit,
S3AInstrumentation.OutputStreamStatistics statistics)
throws IOException {
File destFile = getOwner()
.createTmpFileForWrite("s3ablock", limit, getOwner().getConf());
return new DiskBlock(destFile, limit);
.createTmpFileForWrite(String.format("s3ablock-%04d-", index),
limit, getOwner().getConf());
return new DiskBlock(destFile, limit, index, statistics);
}
}
/**
* Stream to a file.
* This will stop at the limit; the caller is expected to create a new block
* This will stop at the limit; the caller is expected to create a new block.
*/
static class DiskBlock extends DataBlock {
@ -704,12 +824,17 @@ final class S3ADataBlocks {
private final File bufferFile;
private final int limit;
private BufferedOutputStream out;
private InputStream uploadStream;
private final AtomicBoolean closed = new AtomicBoolean(false);
DiskBlock(File bufferFile, int limit)
DiskBlock(File bufferFile,
int limit,
long index,
S3AInstrumentation.OutputStreamStatistics statistics)
throws FileNotFoundException {
super(index, statistics);
this.limit = limit;
this.bufferFile = bufferFile;
blockAllocated();
out = new BufferedOutputStream(new FileOutputStream(bufferFile));
}
@ -738,7 +863,7 @@ final class S3ADataBlocks {
}
@Override
InputStream startUpload() throws IOException {
BlockUploadData startUpload() throws IOException {
super.startUpload();
try {
out.flush();
@ -746,8 +871,7 @@ final class S3ADataBlocks {
out.close();
out = null;
}
uploadStream = new FileInputStream(bufferFile);
return new FileDeletingInputStream(uploadStream);
return new BlockUploadData(bufferFile);
}
/**
@ -755,6 +879,7 @@ final class S3ADataBlocks {
* exists.
* @throws IOException IO problems
*/
@SuppressWarnings("UnnecessaryDefault")
@Override
protected void innerClose() throws IOException {
final DestState state = getState();
@ -763,20 +888,19 @@ final class S3ADataBlocks {
case Writing:
if (bufferFile.exists()) {
// file was not uploaded
LOG.debug("Deleting buffer file as upload did not start");
boolean deleted = bufferFile.delete();
if (!deleted && bufferFile.exists()) {
LOG.warn("Failed to delete buffer file {}", bufferFile);
}
LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
index);
closeBlock();
}
break;
case Upload:
LOG.debug("Buffer file {} exists —close upload stream", bufferFile);
LOG.debug("Block[{}]: Buffer file {} exists —close upload stream",
index, bufferFile);
break;
case Closed:
// no-op
closeBlock();
break;
default:
@ -798,7 +922,8 @@ final class S3ADataBlocks {
@Override
public String toString() {
String sb = "FileBlock{"
+ "destFile=" + bufferFile +
+ "index=" + index
+ ", destFile=" + bufferFile +
", state=" + getState() +
", dataSize=" + dataSize() +
", limit=" + limit +
@ -807,31 +932,20 @@ final class S3ADataBlocks {
}
/**
* An input stream which deletes the buffer file when closed.
* Close the block.
* This will delete the block's buffer file if the block has
* not previously been closed.
*/
private final class FileDeletingInputStream extends FilterInputStream {
private final AtomicBoolean closed = new AtomicBoolean(false);
FileDeletingInputStream(InputStream source) {
super(source);
}
/**
* Delete the input file when closed.
* @throws IOException IO problem
*/
@Override
public void close() throws IOException {
try {
super.close();
} finally {
if (!closed.getAndSet(true)) {
if (!bufferFile.delete()) {
LOG.warn("delete({}) returned false",
bufferFile.getAbsoluteFile());
}
}
void closeBlock() {
LOG.debug("block[{}]: closeBlock()", index);
if (!closed.getAndSet(true)) {
blockReleased();
if (!bufferFile.delete() && bufferFile.exists()) {
LOG.warn("delete({}) returned false",
bufferFile.getAbsoluteFile());
}
} else {
LOG.debug("block[{}]: skipping re-entrant closeBlock()", index);
}
}
}

View File

@ -1022,6 +1022,7 @@ public class S3AFileSystem extends FileSystem {
*/
public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
Preconditions.checkNotNull(srcfile);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile);
setOptionalPutRequestParameters(putObjectRequest);
@ -1039,8 +1040,9 @@ public class S3AFileSystem extends FileSystem {
* @param inputStream source data.
* @return the request
*/
PutObjectRequest newPutObjectRequest(String key,
private PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) {
Preconditions.checkNotNull(inputStream);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
setOptionalPutRequestParameters(putObjectRequest);
@ -1077,12 +1079,16 @@ public class S3AFileSystem extends FileSystem {
}
/**
* PUT an object, incrementing the put requests and put bytes
* Start a transfer-manager managed async PUT of an object,
* incrementing the put requests and put bytes
* counters.
* It does not update the other counters,
* as existing code does that as progress callbacks come in.
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
* Because the operation is async, any stream supplied in the request
* must reference data (files, buffers) which stay valid until the upload
* completes.
* @param putObjectRequest the request
* @return the upload initiated
*/
@ -1108,6 +1114,7 @@ public class S3AFileSystem extends FileSystem {
* PUT an object directly (i.e. not via the transfer manager).
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
* <i>Important: this call will close any input stream in the request.</i>
* @param putObjectRequest the request
* @return the upload initiated
* @throws AmazonClientException on problems
@ -1133,7 +1140,8 @@ public class S3AFileSystem extends FileSystem {
/**
* Upload part of a multi-partition file.
* Increments the write and put counters
* Increments the write and put counters.
* <i>Important: this call does not close any input stream in the request.</i>
* @param request request
* @return the result of the operation.
* @throws AmazonClientException on problems
@ -2309,14 +2317,28 @@ public class S3AFileSystem extends FileSystem {
/**
* Create a {@link PutObjectRequest} request.
* The metadata is assumed to have been configured with the size of the
* operation.
* If {@code length} is set, the metadata is configured with the size of
* the upload.
* @param inputStream source data.
* @param length size, if known. Use -1 for not known
* @return the request
*/
PutObjectRequest newPutRequest(InputStream inputStream, long length) {
return newPutObjectRequest(key, newObjectMetadata(length), inputStream);
PutObjectRequest request = newPutObjectRequest(key,
newObjectMetadata(length), inputStream);
return request;
}
/**
* Create a {@link PutObjectRequest} request to upload a file.
* @param sourceFile source file
* @return the request
*/
PutObjectRequest newPutRequest(File sourceFile) {
int length = (int) sourceFile.length();
PutObjectRequest request = newPutObjectRequest(key,
newObjectMetadata(length), sourceFile);
return request;
}
/**
@ -2379,6 +2401,8 @@ public class S3AFileSystem extends FileSystem {
Preconditions.checkNotNull(partETags);
Preconditions.checkArgument(!partETags.isEmpty(),
"No partitions have been uploaded");
LOG.debug("Completing multipart upload {} with {} parts",
uploadId, partETags.size());
return s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(bucket,
key,
@ -2389,42 +2413,51 @@ public class S3AFileSystem extends FileSystem {
/**
* Abort a multipart upload operation.
* @param uploadId multipart operation Id
* @return the result
* @throws AmazonClientException on problems.
*/
void abortMultipartUpload(String uploadId) throws AmazonClientException {
LOG.debug("Aborting multipart upload {}", uploadId);
s3.abortMultipartUpload(
new AbortMultipartUploadRequest(bucket, key, uploadId));
}
/**
* Create and initialize a part request of a multipart upload.
* Exactly one of: {@code uploadStream} or {@code sourceFile}
* must be specified.
* @param uploadId ID of ongoing upload
* @param uploadStream source of data to upload
* @param partNumber current part number of the upload
* @param size amount of data
* @param uploadStream source of data to upload
* @param sourceFile optional source file.
* @return the request.
*/
UploadPartRequest newUploadPartRequest(String uploadId,
InputStream uploadStream,
int partNumber,
int size) {
int partNumber, int size, InputStream uploadStream, File sourceFile) {
Preconditions.checkNotNull(uploadId);
Preconditions.checkNotNull(uploadStream);
// exactly one source must be set; xor verifies this
Preconditions.checkArgument((uploadStream != null) ^ (sourceFile != null),
"Data source");
Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
Preconditions.checkArgument(partNumber> 0 && partNumber <=10000,
Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000,
"partNumber must be between 1 and 10000 inclusive, but is %s",
partNumber);
LOG.debug("Creating part upload request for {} #{} size {}",
uploadId, partNumber, size);
return new UploadPartRequest()
UploadPartRequest request = new UploadPartRequest()
.withBucketName(bucket)
.withKey(key)
.withUploadId(uploadId)
.withInputStream(uploadStream)
.withPartNumber(partNumber)
.withPartSize(size);
if (uploadStream != null) {
// there's an upload stream. Bind to it.
request.setInputStream(uploadStream);
} else {
request.setFile(sourceFile);
}
return request;
}
/**
@ -2439,6 +2472,21 @@ public class S3AFileSystem extends FileSystem {
sb.append('}');
return sb.toString();
}
/**
* PUT an object directly (i.e. not via the transfer manager).
* @param putObjectRequest the request
* @return the upload initiated
* @throws IOException on problems
*/
PutObjectResult putObject(PutObjectRequest putObjectRequest)
throws IOException {
try {
return putObjectDirect(putObjectRequest);
} catch (AmazonClientException e) {
throw translateException("put", putObjectRequest.getKey(), e);
}
}
}
}

View File

@ -36,6 +36,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem.Statistics;
@ -428,7 +429,7 @@ public class S3AInstrumentation {
if (gauge != null) {
gauge.decr(count);
} else {
LOG.debug("No Gauge: " + op);
LOG.debug("No Gauge: {}", op);
}
}
@ -676,6 +677,8 @@ public class S3AInstrumentation {
private final AtomicLong transferDuration = new AtomicLong(0);
private final AtomicLong queueDuration = new AtomicLong(0);
private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
private final AtomicInteger blocksAllocated = new AtomicInteger(0);
private final AtomicInteger blocksReleased = new AtomicInteger(0);
private Statistics statistics;
@ -683,6 +686,20 @@ public class S3AInstrumentation {
this.statistics = statistics;
}
/**
* A block has been allocated.
*/
void blockAllocated() {
blocksAllocated.incrementAndGet();
}
/**
* A block has been released.
*/
void blockReleased() {
blocksReleased.incrementAndGet();
}
/**
* Block is queued for upload.
*/
@ -778,6 +795,24 @@ public class S3AInstrumentation {
return queueDuration.get() + transferDuration.get();
}
public int blocksAllocated() {
return blocksAllocated.get();
}
public int blocksReleased() {
return blocksReleased.get();
}
/**
* Get counters of blocks actively allocated; my be inaccurate
* if the numbers change during the (non-synchronized) calculation.
* @return the number of actively allocated blocks.
*/
public int blocksActivelyAllocated() {
return blocksAllocated.get() - blocksReleased.get();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
@ -789,6 +824,9 @@ public class S3AInstrumentation {
sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
sb.append(", bytesUploaded=").append(bytesUploaded);
sb.append(", blocksAllocated=").append(blocksAllocated);
sb.append(", blocksReleased=").append(blocksReleased);
sb.append(", blocksActivelyAllocated=").append(blocksActivelyAllocated());
sb.append(", exceptionsInMultipartFinalize=").append(
exceptionsInMultipartFinalize);
sb.append(", transferDuration=").append(transferDuration).append(" ms");

View File

@ -733,4 +733,30 @@ public final class S3AUtils {
}
return null;
}
/**
* Close the Closeable objects and <b>ignore</b> any Exception or
* null pointers.
* (This is the SLF4J equivalent of that in {@code IOUtils}).
* @param log the log to log at debug level. Can be null.
* @param closeables the objects to close
*/
public static void closeAll(Logger log,
java.io.Closeable... closeables) {
for (java.io.Closeable c : closeables) {
if (c != null) {
try {
if (log != null) {
log.debug("Closing {}", c);
}
c.close();
} catch (Exception e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing {}", c, e);
}
}
}
}
}
}

View File

@ -24,9 +24,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import static org.apache.hadoop.fs.s3a.Constants.*;
@ -38,6 +41,14 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
* multipart tests are kept in scale tests.
*/
public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
private static final int BLOCK_SIZE = 256 * 1024;
private static byte[] dataset;
@BeforeClass
public static void setupDataset() {
dataset = ContractTestUtils.dataset(BLOCK_SIZE, 0, 256);
}
@Override
protected Configuration createConfiguration() {
@ -65,9 +76,9 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
}
@Test(expected = IOException.class)
public void testDoubleStreamClose() throws Throwable {
Path dest = path("testDoubleStreamClose");
describe(" testDoubleStreamClose");
public void testWriteAfterStreamClose() throws Throwable {
Path dest = path("testWriteAfterStreamClose");
describe(" testWriteAfterStreamClose");
FSDataOutputStream stream = getFileSystem().create(dest, true);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
try {
@ -79,7 +90,25 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
}
}
public void verifyUpload(String name, int fileSize) throws IOException {
@Test
public void testBlocksClosed() throws Throwable {
Path dest = path("testBlocksClosed");
describe(" testBlocksClosed");
FSDataOutputStream stream = getFileSystem().create(dest, true);
S3AInstrumentation.OutputStreamStatistics statistics
= S3ATestUtils.getOutputStreamStatistics(stream);
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
stream.write(data);
LOG.info("closing output stream");
stream.close();
assertEquals("total allocated blocks in " + statistics,
1, statistics.blocksAllocated());
assertEquals("actively allocated blocks in " + statistics,
0, statistics.blocksActivelyAllocated());
LOG.info("end of test case");
}
private void verifyUpload(String name, int fileSize) throws IOException {
Path dest = path(name);
describe(name + " upload to " + dest);
ContractTestUtils.createAndVerifyFile(
@ -87,4 +116,43 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
dest,
fileSize);
}
/**
* Create a factory for used in mark/reset tests.
* @param fileSystem source FS
* @return the factory
*/
protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
return new S3ADataBlocks.ArrayBlockFactory(fileSystem);
}
private void markAndResetDatablock(S3ADataBlocks.BlockFactory factory)
throws Exception {
S3AInstrumentation instrumentation =
new S3AInstrumentation(new URI("s3a://example"));
S3AInstrumentation.OutputStreamStatistics outstats
= instrumentation.newOutputStreamStatistics(null);
S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats);
block.write(dataset, 0, dataset.length);
S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
InputStream stream = uploadData.getUploadStream();
assertNotNull(stream);
assertTrue("Mark not supported in " + stream, stream.markSupported());
assertEquals(0, stream.read());
stream.mark(BLOCK_SIZE);
// read a lot
long l = 0;
while (stream.read() != -1) {
// do nothing
l++;
}
stream.reset();
assertEquals(1, stream.read());
}
@Test
public void testMarkReset() throws Throwable {
markAndResetDatablock(createFactory(getFileSystem()));
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.fs.s3a;
/**
* Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
*/
@ -27,4 +26,8 @@ public class ITestS3ABlockOutputByteBuffer extends ITestS3ABlockOutputArray {
return Constants.FAST_UPLOAD_BYTEBUFFER;
}
protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
return new S3ADataBlocks.ByteBufferBlockFactory(fileSystem);
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.fs.s3a;
import org.junit.Assume;
/**
* Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering.
*/
@ -27,4 +29,14 @@ public class ITestS3ABlockOutputDisk extends ITestS3ABlockOutputArray {
return Constants.FAST_UPLOAD_BUFFER_DISK;
}
/**
* The disk stream doesn't support mark/reset; calls
* {@code Assume} to skip the test.
* @param fileSystem source FS
* @return null
*/
protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
Assume.assumeTrue("mark/reset nopt supoprted", false);
return null;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
@ -544,4 +545,16 @@ public final class S3ATestUtils {
}
Assume.assumeTrue(message, condition);
}
/**
* Get the statistics from a wrapped block output stream.
* @param out output stream
* @return the (active) stats of the write
*/
public static S3AInstrumentation.OutputStreamStatistics
getOutputStreamStatistics(FSDataOutputStream out) {
S3ABlockOutputStream blockOutputStream
= (S3ABlockOutputStream) out.getWrappedStream();
return blockOutputStream.getStatistics();
}
}

View File

@ -51,9 +51,8 @@ public class TestDataBlocks extends Assert {
new S3ADataBlocks.ByteBufferBlockFactory(null)) {
int limit = 128;
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
= factory.create(limit);
assertEquals("outstanding buffers in " + factory,
1, factory.getOutstandingBufferCount());
= factory.create(1, limit, null);
assertOutstandingBuffers(factory, 1);
byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
int bufferLen = buffer.length;
@ -66,24 +65,23 @@ public class TestDataBlocks extends Assert {
block.hasCapacity(limit - bufferLen));
// now start the write
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferInputStream
stream = block.startUpload();
S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload();
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream
stream =
(S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream)
blockUploadData.getUploadStream();
assertTrue("Mark not supported in " + stream, stream.markSupported());
assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());
int expected = bufferLen;
assertEquals("wrong available() in " + stream,
expected, stream.available());
assertEquals('t', stream.read());
stream.mark(limit);
expected--;
assertEquals("wrong available() in " + stream,
expected, stream.available());
// close the block. The buffer must remain outstanding here;
// the stream manages the lifecycle of it now
block.close();
assertEquals("outstanding buffers in " + factory,
1, factory.getOutstandingBufferCount());
block.close();
// read into a byte array with an offset
int offset = 5;
@ -109,16 +107,31 @@ public class TestDataBlocks extends Assert {
0, stream.available());
assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
// go the mark point
stream.reset();
assertEquals('e', stream.read());
// when the stream is closed, the data should be returned
stream.close();
assertEquals("outstanding buffers in " + factory,
0, factory.getOutstandingBufferCount());
assertOutstandingBuffers(factory, 1);
block.close();
assertOutstandingBuffers(factory, 0);
stream.close();
assertEquals("outstanding buffers in " + factory,
0, factory.getOutstandingBufferCount());
assertOutstandingBuffers(factory, 0);
}
}
/**
* Assert the number of buffers active for a block factory.
* @param factory factory
* @param expectedCount expected count.
*/
private static void assertOutstandingBuffers(
S3ADataBlocks.ByteBufferBlockFactory factory,
int expectedCount) {
assertEquals("outstanding buffers in " + factory,
expectedCount, factory.getOutstandingBufferCount());
}
}

View File

@ -34,11 +34,13 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.util.Progressable;
@ -159,13 +161,20 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
S3AInstrumentation.OutputStreamStatistics streamStatistics;
long blocksPer10MB = blocksPerMB * 10;
ProgressCallback progress = new ProgressCallback(timer);
try (FSDataOutputStream out = fs.create(hugefile,
true,
uploadBlockSize,
progress)) {
try {
streamStatistics = getOutputStreamStatistics(out);
} catch (ClassCastException e) {
LOG.info("Wrapped output stream is not block stream: {}",
out.getWrappedStream());
streamStatistics = null;
}
for (long block = 1; block <= blocks; block++) {
out.write(data);
@ -190,7 +199,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
}
}
// now close the file
LOG.info("Closing file and completing write operation");
LOG.info("Closing stream {}", out);
LOG.info("Statistics : {}", streamStatistics);
ContractTestUtils.NanoTimer closeTimer
= new ContractTestUtils.NanoTimer();
out.close();
@ -201,6 +211,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
filesizeMB, uploadBlockSize);
logFSState();
bandwidth(timer, filesize);
LOG.info("Statistics after stream closed: {}", streamStatistics);
long putRequestCount = storageStatistics.getLong(putRequests);
Long putByteCount = storageStatistics.getLong(putBytes);
LOG.info("PUT {} bytes in {} operations; {} MB/operation",
@ -214,7 +225,14 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
S3AFileStatus status = fs.getFileStatus(hugefile);
ContractTestUtils.assertIsFile(hugefile, status);
assertEquals("File size in " + status, filesize, status.getLen());
progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize);
if (progress != null) {
progress.verifyNoFailures("Put file " + hugefile
+ " of size " + filesize);
}
if (streamStatistics != null) {
assertEquals("actively allocated blocks in " + streamStatistics,
0, streamStatistics.blocksActivelyAllocated());
}
}
/**
@ -285,7 +303,9 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
void assumeHugeFileExists() throws IOException {
S3AFileSystem fs = getFileSystem();
ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
ContractTestUtils.assertIsFile(fs, hugefile);
FileStatus status = fs.getFileStatus(hugefile);
ContractTestUtils.assertIsFile(hugefile, status);
assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
}
private void logFSState() {