diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 6949c67f278..1ea44df5032 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -464,9 +464,4 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Default value for IOStatistics logging level. */ public static final String IOSTATISTICS_LOGGING_LEVEL_DEFAULT = IOSTATISTICS_LOGGING_LEVEL_DEBUG; - - /** - * default hadoop temp dir on local system: {@value}. - */ - public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 7e9137294c1..bbb8517118e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -358,18 +358,6 @@ public final class StreamStatisticNames { public static final String REMOTE_BYTES_READ = "remote_bytes_read"; - /** - * Total number of Data blocks allocated by an outputStream. - */ - public static final String BLOCKS_ALLOCATED - = "blocks_allocated"; - - /** - * Total number of Data blocks released by an outputStream. - */ - public static final String BLOCKS_RELEASED - = "blocks_released"; - private StreamStatisticNames() { } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java deleted file mode 100644 index bf7cbbbc5d5..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.store; - -public interface BlockUploadStatistics { - - /** - * A block has been allocated. - */ - void blockAllocated(); - - /** - * A block has been released. - */ - void blockReleased(); - -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java deleted file mode 100644 index 6602ab579f5..00000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java +++ /dev/null @@ -1,1119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.store; - -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.Closeable; -import java.io.EOFException; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.DirectBufferPool; - -import static java.util.Objects.requireNonNull; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR; -import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed; -import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload; -import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; -import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; - -/** - * A class to provide disk, byteBuffer and byteArray option for Filesystem - * OutputStreams. - * - *

- * Implementation of DataBlocks taken from HADOOP-13560 to support huge file - * uploads in S3A with different options rather than one. - */ -public final class DataBlocks { - - private static final Logger LOG = - LoggerFactory.getLogger(DataBlocks.class); - - /** - * Buffer blocks to disk. - * Capacity is limited to available disk space. - */ - public static final String DATA_BLOCKS_BUFFER_DISK = "disk"; - - /** - * Use a byte buffer. - */ - public static final String DATA_BLOCKS_BYTEBUFFER = "bytebuffer"; - - /** - * Use an in-memory array. Fast but will run of heap rapidly. - */ - public static final String DATA_BLOCKS_BUFFER_ARRAY = "array"; - - private DataBlocks() { - } - - /** - * Validate args to a write command. These are the same validation checks - * expected for any implementation of {@code OutputStream.write()}. - * - * @param b byte array containing data. - * @param off offset in array where to start. - * @param len number of bytes to be written. - * @throws NullPointerException for a null buffer - * @throws IndexOutOfBoundsException if indices are out of range - */ - public static void validateWriteArgs(byte[] b, int off, int len) - throws IOException { - Preconditions.checkNotNull(b); - if ((off < 0) || (off > b.length) || (len < 0) || - ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException( - "write (b[" + b.length + "], " + off + ", " + len + ')'); - } - } - - /** - * Create a factory. - * - * @param keyToBufferDir Key to buffer directory config for a FS. - * @param configuration factory configurations. - * @param name factory name -the option from {@link CommonConfigurationKeys}. - * @return the factory, ready to be initialized. - * @throws IllegalArgumentException if the name is unknown. - */ - public static BlockFactory createFactory(String keyToBufferDir, - Configuration configuration, - String name) { - LOG.debug("Creating DataFactory of type : {}", name); - switch (name) { - case DATA_BLOCKS_BUFFER_ARRAY: - return new ArrayBlockFactory(keyToBufferDir, configuration); - case DATA_BLOCKS_BUFFER_DISK: - return new DiskBlockFactory(keyToBufferDir, configuration); - case DATA_BLOCKS_BYTEBUFFER: - return new ByteBufferBlockFactory(keyToBufferDir, configuration); - default: - throw new IllegalArgumentException("Unsupported block buffer" + - " \"" + name + '"'); - } - } - - /** - * The output information for an upload. - * It can be one of a file, an input stream or a byteArray. - * {@link #toByteArray()} method to be used to convert the data into byte - * array to be doen in this class as well. - * When closed, any stream is closed. Any source file is untouched. - */ - public static final class BlockUploadData implements Closeable { - private final File file; - private InputStream uploadStream; - private byte[] byteArray; - private boolean isClosed; - - /** - * Constructor for byteArray upload data block. File and uploadStream - * would be null. - * - * @param byteArray byteArray used to construct BlockUploadData. - */ - public BlockUploadData(byte[] byteArray) { - this.file = null; - this.uploadStream = null; - - this.byteArray = requireNonNull(byteArray); - } - - /** - * File constructor; input stream and byteArray will be null. - * - * @param file file to upload - */ - BlockUploadData(File file) { - Preconditions.checkArgument(file.exists(), "No file: %s", file); - this.file = file; - this.uploadStream = null; - this.byteArray = null; - } - - /** - * Stream constructor, file and byteArray field will be null. - * - * @param uploadStream stream to upload. - */ - BlockUploadData(InputStream uploadStream) { - requireNonNull(uploadStream, "rawUploadStream"); - this.uploadStream = uploadStream; - this.file = null; - this.byteArray = null; - } - - /** - * Predicate: does this instance contain a file reference. - * - * @return true if there is a file. - */ - boolean hasFile() { - return file != null; - } - - /** - * Get the file, if there is one. - * - * @return the file for uploading, or null. - */ - File getFile() { - return file; - } - - /** - * Get the raw upload stream, if the object was - * created with one. - * - * @return the upload stream or null. - */ - InputStream getUploadStream() { - return uploadStream; - } - - /** - * Convert to a byte array. - * If the data is stored in a file, it will be read and returned. - * If the data was passed in via an input stream (which happens if the - * data is stored in a bytebuffer) then it will be converted to a byte - * array -which will then be cached for any subsequent use. - * - * @return byte[] after converting the uploadBlock. - * @throws IOException throw if an exception is caught while reading - * File/InputStream or closing InputStream. - */ - public byte[] toByteArray() throws IOException { - Preconditions.checkState(!isClosed, "Block is closed"); - if (byteArray != null) { - return byteArray; - } - if (file != null) { - // Need to save byteArray here so that we don't read File if - // byteArray() is called more than once on the same file. - byteArray = FileUtils.readFileToByteArray(file); - return byteArray; - } - byteArray = IOUtils.toByteArray(uploadStream); - IOUtils.close(uploadStream); - uploadStream = null; - return byteArray; - } - - /** - * Close: closes any upload stream and byteArray provided in the - * constructor. - * - * @throws IOException inherited exception. - */ - @Override - public void close() throws IOException { - isClosed = true; - cleanupWithLogger(LOG, uploadStream); - byteArray = null; - if (file != null) { - file.delete(); - } - } - } - - /** - * Base class for block factories. - */ - public static abstract class BlockFactory implements Closeable { - - private final String keyToBufferDir; - private final Configuration conf; - - protected BlockFactory(String keyToBufferDir, Configuration conf) { - this.keyToBufferDir = keyToBufferDir; - this.conf = conf; - } - - /** - * Create a block. - * - * @param index index of block - * @param limit limit of the block. - * @param statistics stats to work with - * @return a new block. - */ - public abstract DataBlock create(long index, int limit, - BlockUploadStatistics statistics) - throws IOException; - - /** - * Implement any close/cleanup operation. - * Base class is a no-op. - * - * @throws IOException Inherited exception; implementations should - * avoid raising it. - */ - @Override - public void close() throws IOException { - } - - /** - * Configuration. - * - * @return config passed to create the factory. - */ - protected Configuration getConf() { - return conf; - } - - /** - * Key to Buffer Directory config for a FS instance. - * - * @return String containing key to Buffer dir. - */ - public String getKeyToBufferDir() { - return keyToBufferDir; - } - } - - /** - * This represents a block being uploaded. - */ - public static abstract class DataBlock implements Closeable { - - enum DestState {Writing, Upload, Closed} - - private volatile DestState state = Writing; - protected final long index; - private final BlockUploadStatistics statistics; - - protected DataBlock(long index, - BlockUploadStatistics statistics) { - this.index = index; - this.statistics = statistics; - } - - /** - * Atomically enter a state, verifying current state. - * - * @param current current state. null means "no check" - * @param next next state - * @throws IllegalStateException if the current state is not as expected - */ - protected synchronized final void enterState(DestState current, - DestState next) - throws IllegalStateException { - verifyState(current); - LOG.debug("{}: entering state {}", this, next); - state = next; - } - - /** - * Verify that the block is in the declared state. - * - * @param expected expected state. - * @throws IllegalStateException if the DataBlock is in the wrong state - */ - protected final void verifyState(DestState expected) - throws IllegalStateException { - if (expected != null && state != expected) { - throw new IllegalStateException("Expected stream state " + expected - + " -but actual state is " + state + " in " + this); - } - } - - /** - * Current state. - * - * @return the current state. - */ - final DestState getState() { - return state; - } - - /** - * Return the current data size. - * - * @return the size of the data. - */ - public abstract int dataSize(); - - /** - * Predicate to verify that the block has the capacity to write - * the given set of bytes. - * - * @param bytes number of bytes desired to be written. - * @return true if there is enough space. - */ - abstract boolean hasCapacity(long bytes); - - /** - * Predicate to check if there is data in the block. - * - * @return true if there is - */ - public boolean hasData() { - return dataSize() > 0; - } - - /** - * The remaining capacity in the block before it is full. - * - * @return the number of bytes remaining. - */ - public abstract int remainingCapacity(); - - /** - * Write a series of bytes from the buffer, from the offset. - * Returns the number of bytes written. - * Only valid in the state {@code Writing}. - * Base class verifies the state but does no writing. - * - * @param buffer buffer. - * @param offset offset. - * @param length length of write. - * @return number of bytes written. - * @throws IOException trouble - */ - public int write(byte[] buffer, int offset, int length) throws IOException { - verifyState(Writing); - Preconditions.checkArgument(buffer != null, "Null buffer"); - Preconditions.checkArgument(length >= 0, "length is negative"); - Preconditions.checkArgument(offset >= 0, "offset is negative"); - Preconditions.checkArgument( - !(buffer.length - offset < length), - "buffer shorter than amount of data to write"); - return 0; - } - - /** - * Flush the output. - * Only valid in the state {@code Writing}. - * In the base class, this is a no-op - * - * @throws IOException any IO problem. - */ - public void flush() throws IOException { - verifyState(Writing); - } - - /** - * Switch to the upload state and return a stream for uploading. - * Base class calls {@link #enterState(DestState, DestState)} to - * manage the state machine. - * - * @return the stream. - * @throws IOException trouble - */ - public BlockUploadData startUpload() throws IOException { - LOG.debug("Start datablock[{}] upload", index); - enterState(Writing, Upload); - return null; - } - - /** - * Enter the closed state. - * - * @return true if the class was in any other state, implying that - * the subclass should do its close operations. - */ - protected synchronized boolean enterClosedState() { - if (!state.equals(Closed)) { - enterState(null, Closed); - return true; - } else { - return false; - } - } - - @Override - public void close() throws IOException { - if (enterClosedState()) { - LOG.debug("Closed {}", this); - innerClose(); - } - } - - /** - * Inner close logic for subclasses to implement. - */ - protected void innerClose() throws IOException { - - } - - /** - * A block has been allocated. - */ - protected void blockAllocated() { - if (statistics != null) { - statistics.blockAllocated(); - } - } - - /** - * A block has been released. - */ - protected void blockReleased() { - if (statistics != null) { - statistics.blockReleased(); - } - } - - protected BlockUploadStatistics getStatistics() { - return statistics; - } - } - - // ==================================================================== - - /** - * Use byte arrays on the heap for storage. - */ - static class ArrayBlockFactory extends BlockFactory { - - ArrayBlockFactory(String keyToBufferDir, Configuration conf) { - super(keyToBufferDir, conf); - } - - @Override - public DataBlock create(long index, int limit, - BlockUploadStatistics statistics) - throws IOException { - return new ByteArrayBlock(0, limit, statistics); - } - - } - - static class DataBlockByteArrayOutputStream extends ByteArrayOutputStream { - - DataBlockByteArrayOutputStream(int size) { - super(size); - } - - /** - * InputStream backed by the internal byte array. - * - * @return ByteArrayInputStream instance. - */ - ByteArrayInputStream getInputStream() { - ByteArrayInputStream bin = new ByteArrayInputStream(this.buf, 0, count); - this.reset(); - this.buf = null; - return bin; - } - } - - /** - * Stream to memory via a {@code ByteArrayOutputStream}. - *

- * It can consume a lot of heap space - * proportional to the mismatch between writes to the stream and - * the JVM-wide upload bandwidth to a Store's endpoint. - * The memory consumption can be limited by tuning the filesystem settings - * to restrict the number of queued/active uploads. - */ - - static class ByteArrayBlock extends DataBlock { - private DataBlockByteArrayOutputStream buffer; - private final int limit; - // cache data size so that it is consistent after the buffer is reset. - private Integer dataSize; - - ByteArrayBlock(long index, - int limit, - BlockUploadStatistics statistics) { - super(index, statistics); - this.limit = limit; - this.buffer = new DataBlockByteArrayOutputStream(limit); - blockAllocated(); - } - - /** - * Get the amount of data; if there is no buffer then the size is 0. - * - * @return the amount of data available to upload. - */ - @Override - public int dataSize() { - return dataSize != null ? dataSize : buffer.size(); - } - - @Override - public BlockUploadData startUpload() throws IOException { - super.startUpload(); - dataSize = buffer.size(); - ByteArrayInputStream bufferData = buffer.getInputStream(); - buffer = null; - return new BlockUploadData(bufferData); - } - - @Override - boolean hasCapacity(long bytes) { - return dataSize() + bytes <= limit; - } - - @Override - public int remainingCapacity() { - return limit - dataSize(); - } - - @Override - public int write(byte[] b, int offset, int len) throws IOException { - super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); - buffer.write(b, offset, written); - return written; - } - - @Override - protected void innerClose() { - buffer = null; - blockReleased(); - } - - @Override - public String toString() { - return "ByteArrayBlock{" - + "index=" + index + - ", state=" + getState() + - ", limit=" + limit + - ", dataSize=" + dataSize + - '}'; - } - } - - // ==================================================================== - - /** - * Stream via Direct ByteBuffers; these are allocated off heap - * via {@link DirectBufferPool}. - */ - - static class ByteBufferBlockFactory extends BlockFactory { - - private final DirectBufferPool bufferPool = new DirectBufferPool(); - private final AtomicInteger buffersOutstanding = new AtomicInteger(0); - - ByteBufferBlockFactory(String keyToBufferDir, Configuration conf) { - super(keyToBufferDir, conf); - } - - @Override public ByteBufferBlock create(long index, int limit, - BlockUploadStatistics statistics) - throws IOException { - return new ByteBufferBlock(index, limit, statistics); - } - - private ByteBuffer requestBuffer(int limit) { - LOG.debug("Requesting buffer of size {}", limit); - buffersOutstanding.incrementAndGet(); - return bufferPool.getBuffer(limit); - } - - private void releaseBuffer(ByteBuffer buffer) { - LOG.debug("Releasing buffer"); - bufferPool.returnBuffer(buffer); - buffersOutstanding.decrementAndGet(); - } - - /** - * Get count of outstanding buffers. - * - * @return the current buffer count. - */ - public int getOutstandingBufferCount() { - return buffersOutstanding.get(); - } - - @Override - public String toString() { - return "ByteBufferBlockFactory{" - + "buffersOutstanding=" + buffersOutstanding + - '}'; - } - - /** - * A DataBlock which requests a buffer from pool on creation; returns - * it when it is closed. - */ - class ByteBufferBlock extends DataBlock { - private ByteBuffer blockBuffer; - private final int bufferSize; - // cache data size so that it is consistent after the buffer is reset. - private Integer dataSize; - - /** - * Instantiate. This will request a ByteBuffer of the desired size. - * - * @param index block index. - * @param bufferSize buffer size. - * @param statistics statistics to update. - */ - ByteBufferBlock(long index, - int bufferSize, - BlockUploadStatistics statistics) { - super(index, statistics); - this.bufferSize = bufferSize; - this.blockBuffer = requestBuffer(bufferSize); - blockAllocated(); - } - - /** - * Get the amount of data; if there is no buffer then the size is 0. - * - * @return the amount of data available to upload. - */ - @Override public int dataSize() { - return dataSize != null ? dataSize : bufferCapacityUsed(); - } - - @Override - public BlockUploadData startUpload() throws IOException { - super.startUpload(); - dataSize = bufferCapacityUsed(); - // set the buffer up from reading from the beginning - blockBuffer.limit(blockBuffer.position()); - blockBuffer.position(0); - return new BlockUploadData( - new ByteBufferInputStream(dataSize, blockBuffer)); - } - - @Override - public boolean hasCapacity(long bytes) { - return bytes <= remainingCapacity(); - } - - @Override - public int remainingCapacity() { - return blockBuffer != null ? blockBuffer.remaining() : 0; - } - - private int bufferCapacityUsed() { - return blockBuffer.capacity() - blockBuffer.remaining(); - } - - @Override - public int write(byte[] b, int offset, int len) throws IOException { - super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); - blockBuffer.put(b, offset, written); - return written; - } - - /** - * Closing the block will release the buffer. - */ - @Override - protected void innerClose() { - if (blockBuffer != null) { - blockReleased(); - releaseBuffer(blockBuffer); - blockBuffer = null; - } - } - - @Override - public String toString() { - return "ByteBufferBlock{" - + "index=" + index + - ", state=" + getState() + - ", dataSize=" + dataSize() + - ", limit=" + bufferSize + - ", remainingCapacity=" + remainingCapacity() + - '}'; - } - - /** - * Provide an input stream from a byte buffer; supporting - * {@link #mark(int)}, which is required to enable replay of failed - * PUT attempts. - */ - class ByteBufferInputStream extends InputStream { - - private final int size; - private ByteBuffer byteBuffer; - - ByteBufferInputStream(int size, - ByteBuffer byteBuffer) { - LOG.debug("Creating ByteBufferInputStream of size {}", size); - this.size = size; - this.byteBuffer = byteBuffer; - } - - /** - * After the stream is closed, set the local reference to the byte - * buffer to null; this guarantees that future attempts to use - * stream methods will fail. - */ - @Override - public synchronized void close() { - LOG.debug("ByteBufferInputStream.close() for {}", - ByteBufferBlock.super.toString()); - byteBuffer = null; - } - - /** - * Verify that the stream is open. - * - * @throws IOException if the stream is closed - */ - private void verifyOpen() throws IOException { - if (byteBuffer == null) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - } - - public synchronized int read() throws IOException { - if (available() > 0) { - return byteBuffer.get() & 0xFF; - } else { - return -1; - } - } - - @Override - public synchronized long skip(long offset) throws IOException { - verifyOpen(); - long newPos = position() + offset; - if (newPos < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); - } - if (newPos > size) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - byteBuffer.position((int) newPos); - return newPos; - } - - @Override - public synchronized int available() { - Preconditions.checkState(byteBuffer != null, - FSExceptionMessages.STREAM_IS_CLOSED); - return byteBuffer.remaining(); - } - - /** - * Get the current buffer position. - * - * @return the buffer position - */ - public synchronized int position() { - return byteBuffer.position(); - } - - /** - * Check if there is data left. - * - * @return true if there is data remaining in the buffer. - */ - public synchronized boolean hasRemaining() { - return byteBuffer.hasRemaining(); - } - - @Override - public synchronized void mark(int readlimit) { - LOG.debug("mark at {}", position()); - byteBuffer.mark(); - } - - @Override - public synchronized void reset() throws IOException { - LOG.debug("reset"); - byteBuffer.reset(); - } - - @Override - public boolean markSupported() { - return true; - } - - /** - * Read in data. - * - * @param b destination buffer. - * @param offset offset within the buffer. - * @param length length of bytes to read. - * @throws EOFException if the position is negative - * @throws IndexOutOfBoundsException if there isn't space for the - * amount of data requested. - * @throws IllegalArgumentException other arguments are invalid. - */ - @SuppressWarnings("NullableProblems") - public synchronized int read(byte[] b, int offset, int length) - throws IOException { - Preconditions.checkArgument(length >= 0, "length is negative"); - Preconditions.checkArgument(b != null, "Null buffer"); - if (b.length - offset < length) { - throw new IndexOutOfBoundsException( - FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER - + ": request length =" + length - + ", with offset =" + offset - + "; buffer capacity =" + (b.length - offset)); - } - verifyOpen(); - if (!hasRemaining()) { - return -1; - } - - int toRead = Math.min(length, available()); - byteBuffer.get(b, offset, toRead); - return toRead; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder( - "ByteBufferInputStream{"); - sb.append("size=").append(size); - ByteBuffer buf = this.byteBuffer; - if (buf != null) { - sb.append(", available=").append(buf.remaining()); - } - sb.append(", ").append(ByteBufferBlock.super.toString()); - sb.append('}'); - return sb.toString(); - } - } - } - } - - // ==================================================================== - - /** - * Buffer blocks to disk. - */ - static class DiskBlockFactory extends BlockFactory { - - private LocalDirAllocator directoryAllocator; - - DiskBlockFactory(String keyToBufferDir, Configuration conf) { - super(keyToBufferDir, conf); - String bufferDir = conf.get(keyToBufferDir) != null - ? keyToBufferDir : HADOOP_TMP_DIR; - directoryAllocator = new LocalDirAllocator(bufferDir); - } - - /** - * Create a temp file and a {@link DiskBlock} instance to manage it. - * - * @param index block index. - * @param limit limit of the block. - * @param statistics statistics to update. - * @return the new block. - * @throws IOException IO problems - */ - @Override - public DataBlock create(long index, - int limit, - BlockUploadStatistics statistics) - throws IOException { - File destFile = createTmpFileForWrite(String.format("datablock-%04d-", - index), - limit, getConf()); - - return new DiskBlock(destFile, limit, index, statistics); - } - - /** - * Demand create the directory allocator, then create a temporary file. - * This does not mark the file for deletion when a process exits. - * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}. - * - * @param pathStr prefix for the temporary file. - * @param size the size of the file that is going to be written. - * @param conf the Configuration object. - * @return a unique temporary file. - * @throws IOException IO problems - */ - File createTmpFileForWrite(String pathStr, long size, - Configuration conf) throws IOException { - Path path = directoryAllocator.getLocalPathForWrite(pathStr, - size, conf); - File dir = new File(path.getParent().toUri().getPath()); - String prefix = path.getName(); - // create a temp file on this directory - return File.createTempFile(prefix, null, dir); - } - } - - /** - * Stream to a file. - * This will stop at the limit; the caller is expected to create a new block. - */ - static class DiskBlock extends DataBlock { - - private int bytesWritten; - private final File bufferFile; - private final int limit; - private BufferedOutputStream out; - private final AtomicBoolean closed = new AtomicBoolean(false); - - DiskBlock(File bufferFile, - int limit, - long index, - BlockUploadStatistics statistics) - throws FileNotFoundException { - super(index, statistics); - this.limit = limit; - this.bufferFile = bufferFile; - blockAllocated(); - out = new BufferedOutputStream(new FileOutputStream(bufferFile)); - } - - @Override public int dataSize() { - return bytesWritten; - } - - @Override - boolean hasCapacity(long bytes) { - return dataSize() + bytes <= limit; - } - - @Override public int remainingCapacity() { - return limit - bytesWritten; - } - - @Override - public int write(byte[] b, int offset, int len) throws IOException { - super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); - out.write(b, offset, written); - bytesWritten += written; - return written; - } - - @Override - public BlockUploadData startUpload() throws IOException { - super.startUpload(); - try { - out.flush(); - } finally { - out.close(); - out = null; - } - return new BlockUploadData(bufferFile); - } - - /** - * The close operation will delete the destination file if it still - * exists. - * - * @throws IOException IO problems - */ - @SuppressWarnings("UnnecessaryDefault") - @Override - protected void innerClose() throws IOException { - final DestState state = getState(); - LOG.debug("Closing {}", this); - switch (state) { - case Writing: - if (bufferFile.exists()) { - // file was not uploaded - LOG.debug("Block[{}]: Deleting buffer file as upload did not start", - index); - closeBlock(); - } - break; - - case Upload: - LOG.debug("Block[{}]: Buffer file {} exists —close upload stream", - index, bufferFile); - break; - - case Closed: - closeBlock(); - break; - - default: - // this state can never be reached, but checkstyle complains, so - // it is here. - } - } - - /** - * Flush operation will flush to disk. - * - * @throws IOException IOE raised on FileOutputStream - */ - @Override public void flush() throws IOException { - super.flush(); - out.flush(); - } - - @Override - public String toString() { - String sb = "FileBlock{" - + "index=" + index - + ", destFile=" + bufferFile + - ", state=" + getState() + - ", dataSize=" + dataSize() + - ", limit=" + limit + - '}'; - return sb; - } - - /** - * Close the block. - * This will delete the block's buffer file if the block has - * not previously been closed. - */ - void closeBlock() { - LOG.debug("block[{}]: closeBlock()", index); - if (!closed.getAndSet(true)) { - blockReleased(); - if (!bufferFile.delete() && bufferFile.exists()) { - LOG.warn("delete({}) returned false", - bufferFile.getAbsoluteFile()); - } - } else { - LOG.debug("block[{}]: skipping re-entrant closeBlock()", index); - } - } - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4731dce3f28..e1ddedd540b 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2300,13 +2300,6 @@ - - fs.azure.buffer.dir - ${hadoop.tmp.dir}/abfs - Directory path for buffer files needed to upload data blocks - in AbfsOutputStream. - - fs.AbstractFileSystem.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java deleted file mode 100644 index 5c2f1b47425..00000000000 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.store; - -import java.io.IOException; -import java.util.Random; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.test.LambdaTestUtils; - -import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; -import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK; -import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * UTs to test {@link DataBlocks} functionalities. - */ -public class TestDataBlocks { - private final Configuration configuration = new Configuration(); - private static final int ONE_KB = 1024; - private static final Logger LOG = - LoggerFactory.getLogger(TestDataBlocks.class); - - /** - * Test to verify different DataBlocks factories, different operations. - */ - @Test - public void testDataBlocksFactory() throws Exception { - testCreateFactory(DATA_BLOCKS_BUFFER_DISK); - testCreateFactory(DATA_BLOCKS_BUFFER_ARRAY); - testCreateFactory(DATA_BLOCKS_BYTEBUFFER); - } - - /** - * Verify creation of a data block factory and it's operations. - * - * @param nameOfFactory Name of the DataBlock factory to be created. - * @throws IOException Throw IOE in case of failure while creating a block. - */ - public void testCreateFactory(String nameOfFactory) throws Exception { - LOG.info("Testing: {}", nameOfFactory); - DataBlocks.BlockFactory diskFactory = - DataBlocks.createFactory("Dir", configuration, nameOfFactory); - - DataBlocks.DataBlock dataBlock = diskFactory.create(0, ONE_KB, null); - assertWriteBlock(dataBlock); - assertToByteArray(dataBlock); - assertCloseBlock(dataBlock); - } - - /** - * Verify Writing of a dataBlock. - * - * @param dataBlock DataBlock to be tested. - * @throws IOException Throw Exception in case of failures. - */ - private void assertWriteBlock(DataBlocks.DataBlock dataBlock) - throws IOException { - byte[] oneKbBuff = new byte[ONE_KB]; - new Random().nextBytes(oneKbBuff); - dataBlock.write(oneKbBuff, 0, ONE_KB); - // Verify DataBlock state is at Writing. - dataBlock.verifyState(DataBlocks.DataBlock.DestState.Writing); - // Verify that the DataBlock has data written. - assertTrue("Expected Data block to have data", dataBlock.hasData()); - // Verify the size of data. - assertEquals("Mismatch in data size in block", dataBlock.dataSize(), - ONE_KB); - // Verify that no capacity is left in the data block to write more. - assertFalse("Expected the data block to have no capacity to write 1 byte " - + "of data", dataBlock.hasCapacity(1)); - } - - /** - * Verify the Conversion of Data blocks into byte[]. - * - * @param dataBlock data block to be tested. - * @throws Exception Throw Exception in case of failures. - */ - private void assertToByteArray(DataBlocks.DataBlock dataBlock) - throws Exception { - DataBlocks.BlockUploadData blockUploadData = dataBlock.startUpload(); - // Verify that the current state is in upload. - dataBlock.verifyState(DataBlocks.DataBlock.DestState.Upload); - // Convert the DataBlock upload to byteArray. - byte[] bytesWritten = blockUploadData.toByteArray(); - // Verify that we can call toByteArray() more than once and gives the - // same byte[]. - assertEquals("Mismatch in byteArray provided by toByteArray() the second " - + "time", bytesWritten, blockUploadData.toByteArray()); - IOUtils.close(blockUploadData); - // Verify that after closing blockUploadData, we can't call toByteArray(). - LambdaTestUtils.intercept(IllegalStateException.class, - "Block is closed", - "Expected to throw IllegalStateException.java after closing " - + "blockUploadData and trying to call toByteArray()", - () -> { - blockUploadData.toByteArray(); - }); - } - - /** - * Verify the close() of data blocks. - * - * @param dataBlock data block to be tested. - * @throws IOException Throw Exception in case of failures. - */ - private void assertCloseBlock(DataBlocks.DataBlock dataBlock) - throws IOException { - dataBlock.close(); - // Verify that the current state is in Closed. - dataBlock.verifyState(DataBlocks.DataBlock.DestState.Closed); - } -} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index b4c8b3eff32..91274289f54 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -90,7 +90,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; -import org.apache.hadoop.fs.store.DataBlocks; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; @@ -102,11 +101,6 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; @@ -131,13 +125,6 @@ public class AzureBlobFileSystem extends FileSystem private TracingHeaderFormat tracingHeaderFormat; private Listener listener; - /** Name of blockFactory to be used by AbfsOutputStream. */ - private String blockOutputBuffer; - /** BlockFactory instance to be used. */ - private DataBlocks.BlockFactory blockFactory; - /** Maximum Active blocks per OutputStream. */ - private int blockOutputActiveBlocks; - @Override public void initialize(URI uri, Configuration configuration) throws IOException { @@ -149,33 +136,8 @@ public void initialize(URI uri, Configuration configuration) this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); abfsCounters = new AbfsCountersImpl(uri); - // name of the blockFactory to be used. - this.blockOutputBuffer = configuration.getTrimmed(DATA_BLOCKS_BUFFER, - DATA_BLOCKS_BUFFER_DEFAULT); - // blockFactory used for this FS instance. - this.blockFactory = - DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR, - configuration, blockOutputBuffer); - this.blockOutputActiveBlocks = - configuration.getInt(FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS, - BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT); - if (blockOutputActiveBlocks < 1) { - blockOutputActiveBlocks = 1; - } - - // AzureBlobFileSystemStore with params in builder. - AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder - systemStoreBuilder = - new AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder() - .withUri(uri) - .withSecureScheme(this.isSecureScheme()) - .withConfiguration(configuration) - .withAbfsCounters(abfsCounters) - .withBlockFactory(blockFactory) - .withBlockOutputActiveBlocks(blockOutputActiveBlocks) - .build(); - - this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder); + this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), + configuration, abfsCounters); LOG.trace("AzureBlobFileSystemStore init complete"); final AbfsConfiguration abfsConfiguration = abfsStore diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 3e9c2b75242..de6f676bab5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -51,8 +51,6 @@ import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; @@ -122,12 +120,8 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.store.DataBlocks; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; -import org.apache.hadoop.util.SemaphoredDelegatingExecutor; -import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.http.client.utils.URIBuilder; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS; @@ -178,23 +172,10 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { */ private Set appendBlobDirSet; - /** BlockFactory being used by this instance.*/ - private DataBlocks.BlockFactory blockFactory; - /** Number of active data blocks per AbfsOutputStream */ - private int blockOutputActiveBlocks; - /** Bounded ThreadPool for this instance. */ - private ExecutorService boundedThreadPool; - - /** - * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations. - * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters - * required. - * @param abfsStoreBuilder Builder for AzureBlobFileSystemStore. - * @throws IOException Throw IOE in case of failure during constructing. - */ - public AzureBlobFileSystemStore( - AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException { - this.uri = abfsStoreBuilder.uri; + public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, + Configuration configuration, + AbfsCounters abfsCounters) throws IOException { + this.uri = uri; String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; @@ -202,7 +183,7 @@ public AzureBlobFileSystemStore( leaseRefs = Collections.synchronizedMap(new WeakHashMap<>()); try { - this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName); + this.abfsConfiguration = new AbfsConfiguration(configuration, accountName); } catch (IllegalAccessException exception) { throw new FileSystemOperationUnhandledException(exception); } @@ -232,16 +213,16 @@ public AzureBlobFileSystemStore( updateInfiniteLeaseDirs(); this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); - boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme; + boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration); - this.abfsCounters = abfsStoreBuilder.abfsCounters; + this.abfsCounters = abfsCounters; initializeClient(uri, fileSystemName, accountName, useHttps); final Class identityTransformerClass = - abfsStoreBuilder.configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class, + configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class, IdentityTransformerInterface.class); try { this.identityTransformer = - identityTransformerClass.getConstructor(Configuration.class).newInstance(abfsStoreBuilder.configuration); + identityTransformerClass.getConstructor(Configuration.class).newInstance(configuration); } catch (IllegalAccessException | InstantiationException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException e) { throw new IOException(e); } @@ -255,13 +236,6 @@ public AzureBlobFileSystemStore( this.appendBlobDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA))); } - this.blockFactory = abfsStoreBuilder.blockFactory; - this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks; - this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( - abfsConfiguration.getWriteMaxConcurrentRequestCount(), - abfsConfiguration.getMaxWriteRequestsToQueue(), - 10L, TimeUnit.SECONDS, - "abfs-bounded"); } /** @@ -298,10 +272,6 @@ public void close() throws IOException { } try { Futures.allAsList(futures).get(); - // shutdown the threadPool and set it to null. - HadoopExecutors.shutdown(boundedThreadPool, LOG, - 30, TimeUnit.SECONDS); - boundedThreadPool = null; } catch (InterruptedException e) { LOG.error("Interrupted freeing leases", e); Thread.currentThread().interrupt(); @@ -528,7 +498,7 @@ public void deleteFilesystem(TracingContext tracingContext) public OutputStream createFile(final Path path, final FileSystem.Statistics statistics, final boolean overwrite, final FsPermission permission, final FsPermission umask, - TracingContext tracingContext) throws IOException { + TracingContext tracingContext) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) { boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext); LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}", @@ -579,14 +549,12 @@ public OutputStream createFile(final Path path, AbfsLease lease = maybeCreateLease(relativePath, tracingContext); return new AbfsOutputStream( - populateAbfsOutputStreamContext( - isAppendBlob, - lease, - client, - statistics, - relativePath, - 0, - tracingContext)); + client, + statistics, + relativePath, + 0, + populateAbfsOutputStreamContext(isAppendBlob, lease), + tracingContext); } } @@ -660,29 +628,8 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa return op; } - /** - * Method to populate AbfsOutputStreamContext with different parameters to - * be used to construct {@link AbfsOutputStream}. - * - * @param isAppendBlob is Append blob support enabled? - * @param lease instance of AbfsLease for this AbfsOutputStream. - * @param client AbfsClient. - * @param statistics FileSystem statistics. - * @param path Path for AbfsOutputStream. - * @param position Position or offset of the file being opened, set to 0 - * when creating a new file, but needs to be set for APPEND - * calls on the same file. - * @param tracingContext instance of TracingContext for this AbfsOutputStream. - * @return AbfsOutputStreamContext instance with the desired parameters. - */ - private AbfsOutputStreamContext populateAbfsOutputStreamContext( - boolean isAppendBlob, - AbfsLease lease, - AbfsClient client, - FileSystem.Statistics statistics, - String path, - long position, - TracingContext tracingContext) { + private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob, + AbfsLease lease) { int bufferSize = abfsConfiguration.getWriteBufferSize(); if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; @@ -697,15 +644,6 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) .withLease(lease) - .withBlockFactory(blockFactory) - .withBlockOutputActiveBlocks(blockOutputActiveBlocks) - .withClient(client) - .withPosition(position) - .withFsStatistics(statistics) - .withPath(path) - .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool, - blockOutputActiveBlocks, true)) - .withTracingContext(tracingContext) .build(); } @@ -817,7 +755,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext( public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite, - TracingContext tracingContext) throws IOException { + TracingContext tracingContext) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}", client.getFileSystem(), @@ -853,14 +791,12 @@ public OutputStream openFileForWrite(final Path path, AbfsLease lease = maybeCreateLease(relativePath, tracingContext); return new AbfsOutputStream( - populateAbfsOutputStreamContext( - isAppendBlob, - lease, - client, - statistics, - relativePath, - offset, - tracingContext)); + client, + statistics, + relativePath, + offset, + populateAbfsOutputStreamContext(isAppendBlob, lease), + tracingContext); } } @@ -1808,57 +1744,6 @@ public String toString() { } } - /** - * A builder class for AzureBlobFileSystemStore. - */ - public static final class AzureBlobFileSystemStoreBuilder { - - private URI uri; - private boolean isSecureScheme; - private Configuration configuration; - private AbfsCounters abfsCounters; - private DataBlocks.BlockFactory blockFactory; - private int blockOutputActiveBlocks; - - public AzureBlobFileSystemStoreBuilder withUri(URI value) { - this.uri = value; - return this; - } - - public AzureBlobFileSystemStoreBuilder withSecureScheme(boolean value) { - this.isSecureScheme = value; - return this; - } - - public AzureBlobFileSystemStoreBuilder withConfiguration( - Configuration value) { - this.configuration = value; - return this; - } - - public AzureBlobFileSystemStoreBuilder withAbfsCounters( - AbfsCounters value) { - this.abfsCounters = value; - return this; - } - - public AzureBlobFileSystemStoreBuilder withBlockFactory( - DataBlocks.BlockFactory value) { - this.blockFactory = value; - return this; - } - - public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks( - int value) { - this.blockOutputActiveBlocks = value; - return this; - } - - public AzureBlobFileSystemStoreBuilder build() { - return this; - } - } - @VisibleForTesting AbfsClient getClient() { return this.client; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 12beb5a9bba..4a2c5951bd5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -56,37 +56,6 @@ public final class ConfigurationKeys { public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests"; public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue"; public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size"; - - /** - * Maximum Number of blocks a single output stream can have - * active (uploading, or queued to the central FileSystem - * instance's pool of queued operations. - * This stops a single stream overloading the shared thread pool. - * {@value} - *

- * Default is {@link FileSystemConfigurations#BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT} - */ - public static final String FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS = - "fs.azure.block.upload.active.blocks"; - - /** - * Buffer directory path for uploading AbfsOutputStream data blocks. - * Value: {@value} - */ - public static final String FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR = - "fs.azure.buffer.dir"; - - /** - * What data block buffer to use. - *
- * Options include: "disk"(Default), "array", and "bytebuffer". - *
- * Default is {@link FileSystemConfigurations#DATA_BLOCKS_BUFFER_DEFAULT}. - * Value: {@value} - */ - public static final String DATA_BLOCKS_BUFFER = - "fs.azure.data.blocks.buffer"; - /** If the data size written by Hadoop app is small, i.e. data size : * (a) before any of HFlush/HSync call is made or * (b) between 2 HFlush/Hsync API calls diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index f58c61e8908..a1de9dfc0ac 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -115,23 +115,5 @@ public final class FileSystemConfigurations { public static final int STREAM_ID_LEN = 12; public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true; - /** - * Limit of queued block upload operations before writes - * block for an OutputStream. Value: {@value} - */ - public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20; - - /** - * Buffer blocks to disk. - * Capacity is limited to available disk space. - */ - public static final String DATA_BLOCKS_BUFFER_DISK = "disk"; - - /** - * Default buffer option: {@value}. - */ - public static final String DATA_BLOCKS_BUFFER_DEFAULT = - DATA_BLOCKS_BUFFER_DISK; - private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 61160f92bdf..91b068a78c9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -20,20 +20,24 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.UUID; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; @@ -43,9 +47,10 @@ import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.Listener; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; -import org.apache.hadoop.fs.store.DataBlocks; +import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; @@ -58,7 +63,6 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE; -import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState; /** * The BlobFsOutputStream for Rest AbfsClient. @@ -68,12 +72,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private final AbfsClient client; private final String path; - /** The position in the file being uploaded, where the next block would be - * uploaded. - * This is used in constructing the AbfsClient requests to ensure that, - * even if blocks are uploaded out of order, they are reassembled in - * correct order. - * */ private long position; private boolean closed; private boolean supportFlush; @@ -93,6 +91,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private final int maxRequestsThatCanBeQueued; private ConcurrentLinkedDeque writeOperations; + private final ThreadPoolExecutor threadExecutor; + private final ExecutorCompletionService completionService; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; @@ -103,6 +103,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private AbfsLease lease; private String leaseId; + /** + * Queue storing buffers with the size of the Azure block ready for + * reuse. The pool allows reusing the blocks instead of allocating new + * blocks. After the data is sent to the service, the buffer is returned + * back to the queue + */ + private ElasticByteBufferPool byteBufferPool + = new ElasticByteBufferPool(); + private final Statistics statistics; private final AbfsOutputStreamStatistics outputStreamStatistics; private IOStatistics ioStatistics; @@ -110,27 +119,17 @@ public class AbfsOutputStream extends OutputStream implements Syncable, private static final Logger LOG = LoggerFactory.getLogger(AbfsOutputStream.class); - /** Factory for blocks. */ - private final DataBlocks.BlockFactory blockFactory; - - /** Current data block. Null means none currently active. */ - private DataBlocks.DataBlock activeBlock; - - /** Count of blocks uploaded. */ - private long blockCount = 0; - - /** The size of a single block. */ - private final int blockSize; - - /** Executor service to carry out the parallel upload requests. */ - private final ListeningExecutorService executorService; - - public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) - throws IOException { - this.client = abfsOutputStreamContext.getClient(); - this.statistics = abfsOutputStreamContext.getStatistics(); - this.path = abfsOutputStreamContext.getPath(); - this.position = abfsOutputStreamContext.getPosition(); + public AbfsOutputStream( + final AbfsClient client, + final Statistics statistics, + final String path, + final long position, + AbfsOutputStreamContext abfsOutputStreamContext, + TracingContext tracingContext) { + this.client = client; + this.statistics = statistics; + this.path = path; + this.position = position; this.closed = false; this.supportFlush = abfsOutputStreamContext.isEnableFlush(); this.disableOutputStreamFlush = abfsOutputStreamContext @@ -141,6 +140,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); + this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.numOfAppendsToServerSinceLastFlush = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); @@ -157,20 +157,23 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.lease = abfsOutputStreamContext.getLease(); this.leaseId = abfsOutputStreamContext.getLeaseId(); - this.executorService = - MoreExecutors.listeningDecorator(abfsOutputStreamContext.getExecutorService()); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 10L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.cachedSasToken = new CachedSASToken( abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); + if (outputStreamStatistics != null) { + this.ioStatistics = outputStreamStatistics.getIOStatistics(); + } this.outputStreamId = createOutputStreamId(); - this.tracingContext = new TracingContext(abfsOutputStreamContext.getTracingContext()); + this.tracingContext = new TracingContext(tracingContext); this.tracingContext.setStreamID(outputStreamId); this.tracingContext.setOperation(FSOperationType.WRITE); - this.ioStatistics = outputStreamStatistics.getIOStatistics(); - this.blockFactory = abfsOutputStreamContext.getBlockFactory(); - this.blockSize = bufferSize; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); } private String createOutputStreamId() { @@ -216,10 +219,10 @@ public void write(final int byteVal) throws IOException { @Override public synchronized void write(final byte[] data, final int off, final int length) throws IOException { - // validate if data is not null and index out of bounds. - DataBlocks.validateWriteArgs(data, off, length); maybeThrowLastError(); + Preconditions.checkArgument(data != null, "null data"); + if (off < 0 || length < 0 || length > data.length - off) { throw new IndexOutOfBoundsException(); } @@ -227,184 +230,29 @@ public synchronized void write(final byte[] data, final int off, final int lengt if (hasLease() && isLeaseFreed()) { throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE); } - DataBlocks.DataBlock block = createBlockIfNeeded(); - int written = block.write(data, off, length); - int remainingCapacity = block.remainingCapacity(); - if (written < length) { - // Number of bytes to write is more than the data block capacity, - // trigger an upload and then write on the next block. - LOG.debug("writing more data than block capacity -triggering upload"); - uploadCurrentBlock(); - // tail recursion is mildly expensive, but given buffer sizes must be MB. - // it's unlikely to recurse very deeply. - this.write(data, off + written, length - written); - } else { - if (remainingCapacity == 0) { - // the whole buffer is done, trigger an upload - uploadCurrentBlock(); + int currentOffset = off; + int writableBytes = bufferSize - bufferIndex; + int numberOfBytesToWrite = length; + + while (numberOfBytesToWrite > 0) { + if (writableBytes <= numberOfBytesToWrite) { + System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); + bufferIndex += writableBytes; + writeCurrentBufferToService(); + currentOffset += writableBytes; + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + } else { + System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); + bufferIndex += numberOfBytesToWrite; + numberOfBytesToWrite = 0; } + + writableBytes = bufferSize - bufferIndex; } incrementWriteOps(); } - /** - * Demand create a destination block. - * - * @return the active block; null if there isn't one. - * @throws IOException on any failure to create - */ - private synchronized DataBlocks.DataBlock createBlockIfNeeded() - throws IOException { - if (activeBlock == null) { - blockCount++; - activeBlock = blockFactory - .create(blockCount, this.blockSize, outputStreamStatistics); - } - return activeBlock; - } - - /** - * Start an asynchronous upload of the current block. - * - * @throws IOException Problems opening the destination for upload, - * initializing the upload, or if a previous operation has failed. - */ - private synchronized void uploadCurrentBlock() throws IOException { - checkState(hasActiveBlock(), "No active block"); - LOG.debug("Writing block # {}", blockCount); - try { - uploadBlockAsync(getActiveBlock(), false, false); - } finally { - // set the block to null, so the next write will create a new block. - clearActiveBlock(); - } - } - - /** - * Upload a block of data. - * This will take the block. - * - * @param blockToUpload block to upload. - * @throws IOException upload failure - */ - private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, - boolean isFlush, boolean isClose) - throws IOException { - if (this.isAppendBlob) { - writeAppendBlobCurrentBufferToService(); - return; - } - if (!blockToUpload.hasData()) { - return; - } - numOfAppendsToServerSinceLastFlush++; - - final int bytesLength = blockToUpload.dataSize(); - final long offset = position; - position += bytesLength; - outputStreamStatistics.bytesToUpload(bytesLength); - outputStreamStatistics.writeCurrentBuffer(); - DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload(); - final Future job = - executorService.submit(() -> { - AbfsPerfTracker tracker = - client.getAbfsPerfTracker(); - try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, - "writeCurrentBufferToService", "append")) { - AppendRequestParameters.Mode - mode = APPEND_MODE; - if (isFlush & isClose) { - mode = FLUSH_CLOSE_MODE; - } else if (isFlush) { - mode = FLUSH_MODE; - } - /* - * Parameters Required for an APPEND call. - * offset(here) - refers to the position in the file. - * bytesLength - Data to be uploaded from the block. - * mode - If it's append, flush or flush_close. - * leaseId - The AbfsLeaseId for this request. - */ - AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, leaseId); - AbfsRestOperation op = - client.append(path, blockUploadData.toByteArray(), reqParams, - cachedSasToken.get(), new TracingContext(tracingContext)); - cachedSasToken.update(op.getSasToken()); - perfInfo.registerResult(op.getResult()); - perfInfo.registerSuccess(true); - outputStreamStatistics.uploadSuccessful(bytesLength); - return null; - } finally { - IOUtils.close(blockUploadData); - } - }); - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); - } - - /** - * A method to set the lastError if an exception is caught. - * @param ex Exception caught. - * @throws IOException Throws the lastError. - */ - private void failureWhileSubmit(Exception ex) throws IOException { - if (ex instanceof AbfsRestOperationException) { - if (((AbfsRestOperationException) ex).getStatusCode() - == HttpURLConnection.HTTP_NOT_FOUND) { - throw new FileNotFoundException(ex.getMessage()); - } - } - if (ex instanceof IOException) { - lastError = (IOException) ex; - } else { - lastError = new IOException(ex); - } - throw lastError; - } - - /** - * Synchronized accessor to the active block. - * - * @return the active block; null if there isn't one. - */ - private synchronized DataBlocks.DataBlock getActiveBlock() { - return activeBlock; - } - - /** - * Predicate to query whether or not there is an active block. - * - * @return true if there is an active block. - */ - private synchronized boolean hasActiveBlock() { - return activeBlock != null; - } - - /** - * Is there an active block and is there any data in it to upload? - * - * @return true if there is some data to upload in an active block else false. - */ - private boolean hasActiveBlockDataToUpload() { - return hasActiveBlock() && getActiveBlock().hasData(); - } - - /** - * Clear the active block. - */ - private void clearActiveBlock() { - if (activeBlock != null) { - LOG.debug("Clearing active block"); - } - synchronized (this) { - activeBlock = null; - } - } - /** * Increment Write Operations. */ @@ -487,6 +335,7 @@ public synchronized void close() throws IOException { try { flushInternal(true); + threadExecutor.shutdown(); } catch (IOException e) { // Problems surface in try-with-resources clauses if // the exception thrown in a close == the one already thrown @@ -503,8 +352,9 @@ public synchronized void close() throws IOException { bufferIndex = 0; closed = true; writeOperations.clear(); - if (hasActiveBlock()) { - clearActiveBlock(); + byteBufferPool = null; + if (!threadExecutor.isShutdown()) { + threadExecutor.shutdownNow(); } } LOG.debug("Closing AbfsOutputStream : {}", this); @@ -518,22 +368,19 @@ private synchronized void flushInternal(boolean isClose) throws IOException { && enableSmallWriteOptimization && (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes && (writeOperations.size() == 0) // double checking no appends in progress - && hasActiveBlockDataToUpload()) { // there is - // some data that is pending to be written + && (bufferIndex > 0)) { // there is some data that is pending to be written smallWriteOptimizedflushInternal(isClose); return; } - if (hasActiveBlockDataToUpload()) { - uploadCurrentBlock(); - } + writeCurrentBufferToService(); flushWrittenBytesToService(isClose); numOfAppendsToServerSinceLastFlush = 0; } private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException { // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush - uploadBlockAsync(getActiveBlock(), true, isClose); + writeCurrentBufferToService(true, isClose); waitForAppendsToComplete(); shrinkWriteOperationQueue(); maybeThrowLastError(); @@ -542,60 +389,131 @@ private synchronized void smallWriteOptimizedflushInternal(boolean isClose) thro private synchronized void flushInternalAsync() throws IOException { maybeThrowLastError(); - if (hasActiveBlockDataToUpload()) { - uploadCurrentBlock(); - } - waitForAppendsToComplete(); + writeCurrentBufferToService(); flushWrittenBytesToServiceAsync(); } - /** - * Appending the current active data block to service. Clearing the active - * data block and releasing all buffered data. - * @throws IOException if there is any failure while starting an upload for - * the dataBlock or while closing the BlockUploadData. - */ private void writeAppendBlobCurrentBufferToService() throws IOException { - DataBlocks.DataBlock activeBlock = getActiveBlock(); - // No data, return. - if (!hasActiveBlockDataToUpload()) { + if (bufferIndex == 0) { return; } - - final int bytesLength = activeBlock.dataSize(); - DataBlocks.BlockUploadData uploadData = activeBlock.startUpload(); - clearActiveBlock(); - outputStreamStatistics.writeCurrentBuffer(); - outputStreamStatistics.bytesToUpload(bytesLength); + final byte[] bytes = buffer; + final int bytesLength = bufferIndex; + if (outputStreamStatistics != null) { + outputStreamStatistics.writeCurrentBuffer(); + outputStreamStatistics.bytesToUpload(bytesLength); + } + buffer = byteBufferPool.getBuffer(false, bufferSize).array(); + bufferIndex = 0; final long offset = position; position += bytesLength; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, - "writeCurrentBufferToService", "append")) { + "writeCurrentBufferToService", "append")) { AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0, bytesLength, APPEND_MODE, true, leaseId); - AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams, - cachedSasToken.get(), new TracingContext(tracingContext)); + AbfsRestOperation op = client + .append(path, bytes, reqParams, cachedSasToken.get(), + new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); - outputStreamStatistics.uploadSuccessful(bytesLength); - + if (outputStreamStatistics != null) { + outputStreamStatistics.uploadSuccessful(bytesLength); + } perfInfo.registerResult(op.getResult()); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); perfInfo.registerSuccess(true); return; } catch (Exception ex) { - outputStreamStatistics.uploadFailed(bytesLength); - failureWhileSubmit(ex); - } finally { - IOUtils.close(uploadData); + if (ex instanceof AbfsRestOperationException) { + if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + throw new FileNotFoundException(ex.getMessage()); + } + } + if (ex instanceof AzureBlobFileSystemException) { + ex = (AzureBlobFileSystemException) ex; + } + lastError = new IOException(ex); + throw lastError; } } + private synchronized void writeCurrentBufferToService() throws IOException { + writeCurrentBufferToService(false, false); + } + + private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException { + if (this.isAppendBlob) { + writeAppendBlobCurrentBufferToService(); + return; + } + + if (bufferIndex == 0) { + return; + } + numOfAppendsToServerSinceLastFlush++; + + final byte[] bytes = buffer; + final int bytesLength = bufferIndex; + if (outputStreamStatistics != null) { + outputStreamStatistics.writeCurrentBuffer(); + outputStreamStatistics.bytesToUpload(bytesLength); + } + buffer = byteBufferPool.getBuffer(false, bufferSize).array(); + bufferIndex = 0; + final long offset = position; + position += bytesLength; + + if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) { + //Tracking time spent on waiting for task to complete. + if (outputStreamStatistics != null) { + try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) { + waitForTaskToComplete(); + } + } else { + waitForTaskToComplete(); + } + } + final Future job = completionService.submit(() -> { + AbfsPerfTracker tracker = client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "writeCurrentBufferToService", "append")) { + AppendRequestParameters.Mode + mode = APPEND_MODE; + if (isFlush & isClose) { + mode = FLUSH_CLOSE_MODE; + } else if (isFlush) { + mode = FLUSH_MODE; + } + AppendRequestParameters reqParams = new AppendRequestParameters( + offset, 0, bytesLength, mode, false, leaseId); + AbfsRestOperation op = client.append(path, bytes, reqParams, + cachedSasToken.get(), new TracingContext(tracingContext)); + cachedSasToken.update(op.getSasToken()); + perfInfo.registerResult(op.getResult()); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); + perfInfo.registerSuccess(true); + return null; + } + }); + + if (outputStreamStatistics != null) { + if (job.isCancelled()) { + outputStreamStatistics.uploadFailed(bytesLength); + } else { + outputStreamStatistics.uploadSuccessful(bytesLength); + } + } + writeOperations.add(new WriteOperation(job, offset, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + } + private synchronized void waitForAppendsToComplete() throws IOException { for (WriteOperation writeOperation : writeOperations) { try { writeOperation.task.get(); } catch (Exception ex) { - outputStreamStatistics.uploadFailed(writeOperation.length); if (ex.getCause() instanceof AbfsRestOperationException) { if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new FileNotFoundException(ex.getMessage()); @@ -645,8 +563,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, throw new FileNotFoundException(ex.getMessage()); } } - lastError = new IOException(ex); - throw lastError; + throw new IOException(ex); } this.lastFlushOffset = offset; } @@ -657,14 +574,14 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, */ private synchronized void shrinkWriteOperationQueue() throws IOException { try { - WriteOperation peek = writeOperations.peek(); - while (peek != null && peek.task.isDone()) { - peek.task.get(); - lastTotalAppendOffset += peek.length; + while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { + writeOperations.peek().task.get(); + lastTotalAppendOffset += writeOperations.peek().length; writeOperations.remove(); - peek = writeOperations.peek(); // Incrementing statistics to indicate queue has been shrunk. - outputStreamStatistics.queueShrunk(); + if (outputStreamStatistics != null) { + outputStreamStatistics.queueShrunk(); + } } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { @@ -676,6 +593,26 @@ private synchronized void shrinkWriteOperationQueue() throws IOException { } } + private void waitForTaskToComplete() throws IOException { + boolean completed; + for (completed = false; completionService.poll() != null; completed = true) { + // keep polling until there is no data + } + // for AppendBLob, jobs are not submitted to completion service + if (isAppendBlob) { + completed = true; + } + + if (!completed) { + try { + completionService.take(); + } catch (InterruptedException e) { + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); + throw lastError; + } + } + } + private static class WriteOperation { private final Future task; private final long startOffset; @@ -694,7 +631,7 @@ private static class WriteOperation { @VisibleForTesting public synchronized void waitForPendingUploads() throws IOException { - waitForAppendsToComplete(); + waitForTaskToComplete(); } /** @@ -758,10 +695,12 @@ public boolean hasLease() { @Override public String toString() { final StringBuilder sb = new StringBuilder(super.toString()); - sb.append("AbfsOutputStream@").append(this.hashCode()); - sb.append("){"); - sb.append(outputStreamStatistics.toString()); - sb.append("}"); + if (outputStreamStatistics != null) { + sb.append("AbfsOutputStream@").append(this.hashCode()); + sb.append("){"); + sb.append(outputStreamStatistics.toString()); + sb.append("}"); + } return sb.toString(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index ad303823e0c..48f6f540810 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -18,12 +18,6 @@ package org.apache.hadoop.fs.azurebfs.services; -import java.util.concurrent.ExecutorService; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import org.apache.hadoop.fs.store.DataBlocks; - /** * Class to hold extra output stream configs. */ @@ -47,22 +41,6 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private AbfsLease lease; - private DataBlocks.BlockFactory blockFactory; - - private int blockOutputActiveBlocks; - - private AbfsClient client; - - private long position; - - private FileSystem.Statistics statistics; - - private String path; - - private ExecutorService executorService; - - private TracingContext tracingContext; - public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { super(sasTokenRenewPeriodForStreamsInSeconds); } @@ -101,64 +79,11 @@ public AbfsOutputStreamContext withAppendBlob( return this; } - public AbfsOutputStreamContext withBlockFactory( - final DataBlocks.BlockFactory blockFactory) { - this.blockFactory = blockFactory; - return this; - } - - public AbfsOutputStreamContext withBlockOutputActiveBlocks( - final int blockOutputActiveBlocks) { - this.blockOutputActiveBlocks = blockOutputActiveBlocks; - return this; - } - - - public AbfsOutputStreamContext withClient( - final AbfsClient client) { - this.client = client; - return this; - } - - public AbfsOutputStreamContext withPosition( - final long position) { - this.position = position; - return this; - } - - public AbfsOutputStreamContext withFsStatistics( - final FileSystem.Statistics statistics) { - this.statistics = statistics; - return this; - } - - public AbfsOutputStreamContext withPath( - final String path) { - this.path = path; - return this; - } - - public AbfsOutputStreamContext withExecutorService( - final ExecutorService executorService) { - this.executorService = executorService; - return this; - } - - public AbfsOutputStreamContext withTracingContext( - final TracingContext tracingContext) { - this.tracingContext = tracingContext; - return this; - } - public AbfsOutputStreamContext build() { // Validation of parameters to be done here. - if (streamStatistics == null) { - streamStatistics = new AbfsOutputStreamStatisticsImpl(); - } return this; } - public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount( final int writeMaxConcurrentRequestCount) { this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount; @@ -218,36 +143,4 @@ public String getLeaseId() { } return this.lease.getLeaseID(); } - - public DataBlocks.BlockFactory getBlockFactory() { - return blockFactory; - } - - public int getBlockOutputActiveBlocks() { - return blockOutputActiveBlocks; - } - - public AbfsClient getClient() { - return client; - } - - public FileSystem.Statistics getStatistics() { - return statistics; - } - - public String getPath() { - return path; - } - - public long getPosition() { - return position; - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public TracingContext getTracingContext() { - return tracingContext; - } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java index a9e088c025b..c57d5d9bcaa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java @@ -22,14 +22,12 @@ import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; -import org.apache.hadoop.fs.store.BlockUploadStatistics; /** * Interface for {@link AbfsOutputStream} statistics. */ @InterfaceStability.Unstable -public interface AbfsOutputStreamStatistics extends IOStatisticsSource, - BlockUploadStatistics { +public interface AbfsOutputStreamStatistics extends IOStatisticsSource { /** * Number of bytes to be uploaded. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java index cb054e2915d..b07cf28a710 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java @@ -42,9 +42,7 @@ public class AbfsOutputStreamStatisticsImpl StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL, StreamStatisticNames.BYTES_UPLOAD_FAILED, StreamStatisticNames.QUEUE_SHRUNK_OPS, - StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS, - StreamStatisticNames.BLOCKS_ALLOCATED, - StreamStatisticNames.BLOCKS_RELEASED + StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS ) .withDurationTracking( StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST, @@ -62,11 +60,6 @@ public class AbfsOutputStreamStatisticsImpl private final AtomicLong writeCurrentBufferOps = ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS); - private final AtomicLong blocksAllocated = - ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_ALLOCATED); - private final AtomicLong blocksReleased = - ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_RELEASED); - /** * Records the need to upload bytes and increments the total bytes that * needs to be uploaded. @@ -140,22 +133,6 @@ public void writeCurrentBuffer() { writeCurrentBufferOps.incrementAndGet(); } - /** - * Increment the counter to indicate a block has been allocated. - */ - @Override - public void blockAllocated() { - blocksAllocated.incrementAndGet(); - } - - /** - * Increment the counter to indicate a block has been released. - */ - @Override - public void blockReleased() { - blocksReleased.incrementAndGet(); - } - /** * {@inheritDoc} * diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java index 231c54825f2..82907c57475 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java @@ -20,8 +20,6 @@ import org.apache.hadoop.fs.Path; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; - /** * Constants for the Azure tests. */ @@ -177,15 +175,4 @@ public interface AzureTestConstants { * Base directory for page blobs. */ Path PAGE_BLOB_DIR = new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY); - - /** - * Huge file for testing AbfsOutputStream uploads: {@value} - */ - String AZURE_SCALE_HUGE_FILE_UPLOAD = AZURE_SCALE_TEST + "huge.upload"; - - /** - * Default value for Huge file to be tested for AbfsOutputStream uploads: - * {@value} - */ - int AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT = 2 * DEFAULT_WRITE_BUFFER_SIZE; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 56d553819fe..d4e44c37f63 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -488,7 +488,7 @@ protected AbfsDelegationTokenManager getDelegationTokenManager() */ protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled( AzureBlobFileSystem fs, - Path path) throws IOException { + Path path) throws AzureBlobFileSystemException { AzureBlobFileSystemStore abfss = fs.getAbfsStore(); abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java index b74b309bd4f..14c9bff7bf8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java @@ -24,10 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azure.integration.AzureTestConstants; -import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD; -import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT; import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled; -import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.getTestPropertyInt; /** * Integration tests at bigger scale; configurable as to @@ -37,7 +34,6 @@ public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest { protected static final Logger LOG = LoggerFactory.getLogger(AbstractAbfsScaleTest.class); - private static Configuration rawConfiguration; public AbstractAbfsScaleTest() throws Exception { super(); @@ -52,7 +48,7 @@ protected int getTestTimeoutMillis() { public void setup() throws Exception { super.setup(); LOG.debug("Scale test operation count = {}", getOperationCount()); - rawConfiguration = getRawConfiguration(); + Configuration rawConfiguration = getRawConfiguration(); assumeScaleTestsEnabled(rawConfiguration); } @@ -60,15 +56,4 @@ protected long getOperationCount() { return getConfiguration().getLong(AzureTestConstants.KEY_OPERATION_COUNT, AzureTestConstants.DEFAULT_OPERATION_COUNT); } - - - /** - * Method to get the Huge file for upload value for scale test. - * @return the huge value set. - */ - public static int getHugeFileUploadValue() { - return getTestPropertyInt(rawConfiguration, - AZURE_SCALE_HUGE_FILE_UPLOAD, - AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT); - } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java deleted file mode 100644 index 6473603c11b..00000000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Random; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; - -import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; - -/** - * Testing Huge file for AbfsOutputStream. - */ -@RunWith(Parameterized.class) -public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest { - private static final int ONE_MB = 1024 * 1024; - private static final int EIGHT_MB = 8 * ONE_MB; - private final int size; - - @Parameterized.Parameters(name = "Size={0}") - public static Iterable sizes() { - return Arrays.asList(new Object[][] { - { DEFAULT_WRITE_BUFFER_SIZE }, - { getHugeFileUploadValue() } }); - } - - public ITestAbfsHugeFiles(int size) throws Exception { - this.size = size; - } - - /** - * Testing Huge files written at once on AbfsOutputStream. - */ - @Test - public void testHugeFileWrite() throws IOException { - AzureBlobFileSystem fs = getFileSystem(); - Path filePath = path(getMethodName()); - final byte[] b = new byte[size]; - new Random().nextBytes(b); - try (FSDataOutputStream out = fs.create(filePath)) { - out.write(b); - } - // Verify correct length was uploaded. Don't want to verify contents - // here, as this would increase the test time significantly. - assertEquals("Mismatch in content length of file uploaded", size, - fs.getFileStatus(filePath).getLen()); - } - - /** - * Testing Huge files written in chunks of 8M in lots of writes. - */ - @Test - public void testLotsOfWrites() throws IOException { - assume("If the size isn't a multiple of 8M this test would not pass, so " - + "skip", size % EIGHT_MB == 0); - AzureBlobFileSystem fs = getFileSystem(); - Path filePath = path(getMethodName()); - final byte[] b = new byte[size]; - new Random().nextBytes(b); - try (FSDataOutputStream out = fs.create(filePath)) { - int offset = 0; - for (int i = 0; i < size / EIGHT_MB; i++) { - out.write(b, offset, EIGHT_MB); - offset += EIGHT_MB; - } - } - LOG.info(String.valueOf(size % EIGHT_MB)); - // Verify correct length was uploaded. Don't want to verify contents - // here, as this would increase the test time significantly. - assertEquals("Mismatch in content length of file uploaded", size, - fs.getFileStatus(filePath).getLen()); - } -} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java index a4d23b3b28b..2894abe4d0e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java @@ -48,6 +48,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED; @@ -294,10 +295,15 @@ public void testFileSystemClose() throws Exception { FSDataOutputStream out = fs.create(testFilePath); out.write(0); Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed()); - out.close(); fs.close(); Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed()); + LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT + : ERR_LEASE_EXPIRED, () -> { + out.close(); + return "Expected exception on close after closed FS but got " + out; + }); + LambdaTestUtils.intercept(RejectedExecutionException.class, () -> { try (FSDataOutputStream out2 = fs.append(testFilePath)) { } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index 063b0641c9b..f01c81b74ee 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -21,28 +21,18 @@ import java.io.IOException; import java.util.Arrays; import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; -import org.apache.hadoop.fs.store.DataBlocks; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; -import org.apache.hadoop.util.SemaphoredDelegatingExecutor; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.refEq; @@ -68,27 +58,12 @@ public final class TestAbfsOutputStream { private final String accountKey1 = globalKey + "." + accountName1; private final String accountValue1 = "one"; - private AbfsOutputStreamContext populateAbfsOutputStreamContext( - int writeBufferSize, - boolean isFlushEnabled, - boolean disableOutputStreamFlush, - boolean isAppendBlob, - AbfsClient client, - FileSystem.Statistics statistics, - String path, - TracingContext tracingContext, - ExecutorService executorService) throws IOException, - IllegalAccessException { + private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize, + boolean isFlushEnabled, + boolean disableOutputStreamFlush, + boolean isAppendBlob) throws IOException, IllegalAccessException { AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(), accountName1); - String blockFactoryName = - abfsConf.getRawConfiguration().getTrimmed(DATA_BLOCKS_BUFFER, - DATA_BLOCKS_BUFFER_DEFAULT); - DataBlocks.BlockFactory blockFactory = - DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR, - abfsConf.getRawConfiguration(), - blockFactoryName); - return new AbfsOutputStreamContext(2) .withWriteBufferSize(writeBufferSize) .enableFlush(isFlushEnabled) @@ -97,12 +72,6 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withAppendBlob(isAppendBlob) .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue()) - .withClient(client) - .withPath(path) - .withFsStatistics(statistics) - .withTracingContext(tracingContext) - .withExecutorService(executorService) - .withBlockFactory(blockFactory) .build(); } @@ -126,19 +95,11 @@ public void verifyShortWriteRequest() throws Exception { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream( - populateAbfsOutputStreamContext( - BUFFER_SIZE, - true, - false, - false, - client, - null, - PATH, - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), - null), - createExecutorService(abfsConf))); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), + null)); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); out.write(b); @@ -188,17 +149,9 @@ public void verifyWriteRequest() throws Exception { when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(TracingContext.class))).thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream( - populateAbfsOutputStreamContext( - BUFFER_SIZE, - true, - false, - false, - client, - null, - PATH, - tracingContext, - createExecutorService(abfsConf))); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), + tracingContext); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); @@ -263,17 +216,9 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); - AbfsOutputStream out = new AbfsOutputStream( - populateAbfsOutputStreamContext( - BUFFER_SIZE, - true, - false, - false, - client, - null, - PATH, - tracingContext, - createExecutorService(abfsConf))); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), + tracingContext); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -335,19 +280,11 @@ public void verifyWriteRequestOfBufferSize() throws Exception { when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); - AbfsOutputStream out = new AbfsOutputStream( - populateAbfsOutputStreamContext( - BUFFER_SIZE, - true, - false, - false, - client, - null, - PATH, - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), - null), - createExecutorService(abfsConf))); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), + null)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -391,19 +328,11 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream( - populateAbfsOutputStreamContext( - BUFFER_SIZE, - true, - false, - true, - client, - null, - PATH, - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), - null), - createExecutorService(abfsConf))); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true), + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), + null)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -451,19 +380,10 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream( - populateAbfsOutputStreamContext( - BUFFER_SIZE, - true, - false, - false, - client, - null, - PATH, - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), - null), - createExecutorService(abfsConf))); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), + null)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -521,19 +441,11 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op); - AbfsOutputStream out = new AbfsOutputStream( - populateAbfsOutputStreamContext( - BUFFER_SIZE, - true, - false, - false, - client, - null, - PATH, - new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", - FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), - null), - createExecutorService(abfsConf))); + AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0, + populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), + new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", + FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), + null)); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -557,22 +469,4 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { verify(client, times(2)).append( eq(PATH), any(byte[].class), any(), any(), any(TracingContext.class)); } - - /** - * Method to create an executor Service for AbfsOutputStream. - * @param abfsConf Configuration. - * @return ExecutorService. - */ - private ExecutorService createExecutorService( - AbfsConfiguration abfsConf) { - ExecutorService executorService = - new SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance( - abfsConf.getWriteMaxConcurrentRequestCount(), - abfsConf.getMaxWriteRequestsToQueue(), - 10L, TimeUnit.SECONDS, - "abfs-test-bounded"), - BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT, true); - return executorService; - } - }