diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 6b194a41de2..62145e10b1b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -84,6 +84,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; @@ -426,6 +427,7 @@ public class AzureBlobFileSystemStore implements Closeable { .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) .enableFlush(abfsConfiguration.isFlushEnabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) + .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index f29cc4afe82..4297b18651b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -35,6 +35,8 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -50,6 +52,7 @@ import static org.apache.hadoop.io.IOUtils.wrapException; * The BlobFsOutputStream for Rest AbfsClient. */ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities { + private final AbfsClient client; private final String path; private long position; @@ -80,6 +83,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa = new ElasticByteBufferPool(); private final Statistics statistics; + private final AbfsOutputStreamStatistics outputStreamStatistics; + + private static final Logger LOG = + LoggerFactory.getLogger(AbfsOutputStream.class); public AbfsOutputStream( final AbfsClient client, @@ -101,6 +108,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); + this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); @@ -278,6 +286,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa threadExecutor.shutdownNow(); } } + if (LOG.isDebugEnabled()) { + LOG.debug("Closing AbfsOutputStream ", toString()); + } } private synchronized void flushInternal(boolean isClose) throws IOException { @@ -296,16 +307,20 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa if (bufferIndex == 0) { return; } + outputStreamStatistics.writeCurrentBuffer(); final byte[] bytes = buffer; final int bytesLength = bufferIndex; + outputStreamStatistics.bytesToUpload(bytesLength); buffer = byteBufferPool.getBuffer(false, bufferSize).array(); bufferIndex = 0; final long offset = position; position += bytesLength; if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + long start = System.currentTimeMillis(); waitForTaskToComplete(); + outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); } final Future job = completionService.submit(new Callable() { @@ -324,6 +339,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa } }); + if (job.isCancelled()) { + outputStreamStatistics.uploadFailed(bytesLength); + } else { + outputStreamStatistics.uploadSuccessful(bytesLength); + } writeOperations.add(new WriteOperation(job, offset, bytesLength)); // Try to shrink the queue @@ -388,6 +408,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa writeOperations.peek().task.get(); lastTotalAppendOffset += writeOperations.peek().length; writeOperations.remove(); + // Incrementing statistics to indicate queue has been shrunk. + outputStreamStatistics.queueShrunk(); } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { @@ -435,4 +457,38 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa public synchronized void waitForPendingUploads() throws IOException { waitForTaskToComplete(); } + + /** + * Getter method for AbfsOutputStream statistics. + * + * @return statistics for AbfsOutputStream. + */ + @VisibleForTesting + public AbfsOutputStreamStatistics getOutputStreamStatistics() { + return outputStreamStatistics; + } + + /** + * Getter to get the size of the task queue. + * + * @return the number of writeOperations in AbfsOutputStream. + */ + @VisibleForTesting + public int getWriteOperationsSize() { + return writeOperations.size(); + } + + /** + * Appending AbfsOutputStream statistics to base toString(). + * + * @return String with AbfsOutputStream statistics. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsOuputStream@").append(this.hashCode()).append("){"); + sb.append(outputStreamStatistics.toString()); + sb.append("}"); + return sb.toString(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 0be97c5d9f1..e0aefbf33b2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -29,6 +29,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private boolean disableOutputStreamFlush; + private AbfsOutputStreamStatistics streamStatistics; + public AbfsOutputStreamContext() { } @@ -49,6 +51,12 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { return this; } + public AbfsOutputStreamContext withStreamStatistics( + final AbfsOutputStreamStatistics streamStatistics) { + this.streamStatistics = streamStatistics; + return this; + } + public AbfsOutputStreamContext build() { // Validation of parameters to be done here. return this; @@ -65,4 +73,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { public boolean isDisableOutputStreamFlush() { return disableOutputStreamFlush; } + + public AbfsOutputStreamStatistics getStreamStatistics() { + return streamStatistics; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java new file mode 100644 index 00000000000..c9fe0dd4552 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for {@link AbfsOutputStream} statistics. + */ +@InterfaceStability.Unstable +public interface AbfsOutputStreamStatistics { + + /** + * Number of bytes to be uploaded. + * + * @param bytes number of bytes to upload. + */ + void bytesToUpload(long bytes); + + /** + * Records a successful upload and the number of bytes uploaded. + * + * @param bytes number of bytes that were successfully uploaded. + */ + void uploadSuccessful(long bytes); + + /** + * Records that upload is failed and the number of bytes. + * + * @param bytes number of bytes that failed to upload. + */ + void uploadFailed(long bytes); + + /** + * Time spent in waiting for tasks to be completed in the blocking queue. + * + * @param start millisecond at which the wait for task to be complete begins. + * @param end millisecond at which the wait is completed for the task. + */ + void timeSpentTaskWait(long start, long end); + + /** + * Number of times task queue is shrunk. + */ + void queueShrunk(); + + /** + * Number of times buffer is written to the service after a write operation. + */ + void writeCurrentBuffer(); + + /** + * Method to form a string of all AbfsOutputStream statistics and their + * values. + * + * @return AbfsOutputStream statistics. + */ + @Override + String toString(); + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java new file mode 100644 index 00000000000..cd5a29e217c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * OutputStream statistics implementation for Abfs. + */ +public class AbfsOutputStreamStatisticsImpl + implements AbfsOutputStreamStatistics { + private long bytesToUpload; + private long bytesUploadSuccessful; + private long bytesUploadFailed; + /** + * Counter to get the total time spent while waiting for tasks to complete + * in the blocking queue inside the thread executor. + */ + private long timeSpentOnTaskWait; + /** + * Counter to get the total number of queue shrink operations done {@code + * AbfsOutputStream#shrinkWriteOperationQueue()} by AbfsOutputStream to + * remove the write operations which were successfully done by + * AbfsOutputStream from the task queue. + */ + private long queueShrunkOps; + /** + * Counter to get the total number of times the current buffer is written + * to the service {@code AbfsOutputStream#writeCurrentBufferToService()} via + * AbfsClient and appended to the data store by AbfsRestOperation. + */ + private long writeCurrentBufferOperations; + + /** + * Records the need to upload bytes and increments the total bytes that + * needs to be uploaded. + * + * @param bytes total bytes to upload. Negative bytes are ignored. + */ + @Override + public void bytesToUpload(long bytes) { + if (bytes > 0) { + bytesToUpload += bytes; + } + } + + /** + * Records the total bytes successfully uploaded through AbfsOutputStream. + * + * @param bytes number of bytes that were successfully uploaded. Negative + * bytes are ignored. + */ + @Override + public void uploadSuccessful(long bytes) { + if (bytes > 0) { + bytesUploadSuccessful += bytes; + } + } + + /** + * Records the total bytes failed to upload through AbfsOutputStream. + * + * @param bytes number of bytes failed to upload. Negative bytes are ignored. + */ + @Override + public void uploadFailed(long bytes) { + if (bytes > 0) { + bytesUploadFailed += bytes; + } + } + + /** + * {@inheritDoc} + * + * Records the total time spent waiting for a task to complete. + * + * When the thread executor has a task queue + * {@link java.util.concurrent.BlockingQueue} of size greater than or + * equal to 2 times the maxConcurrentRequestCounts then, it waits for a + * task in that queue to finish, then do the next task in the queue. + * + * This time spent while waiting for the task to be completed is being + * recorded in this counter. + * + * @param startTime time(in milliseconds) before the wait for task to be + * completed is begin. + * @param endTime time(in milliseconds) after the wait for the task to be + * completed is done. + */ + @Override + public void timeSpentTaskWait(long startTime, long endTime) { + timeSpentOnTaskWait += endTime - startTime; + } + + /** + * {@inheritDoc} + * + * Records the number of times AbfsOutputStream try to remove the completed + * write operations from the beginning of write operation task queue. + */ + @Override + public void queueShrunk() { + queueShrunkOps++; + } + + /** + * {@inheritDoc} + * + * Records the number of times AbfsOutputStream writes the buffer to the + * service via the AbfsClient and appends the buffer to the service. + */ + @Override + public void writeCurrentBuffer() { + writeCurrentBufferOperations++; + } + + public long getBytesToUpload() { + return bytesToUpload; + } + + public long getBytesUploadSuccessful() { + return bytesUploadSuccessful; + } + + public long getBytesUploadFailed() { + return bytesUploadFailed; + } + + public long getTimeSpentOnTaskWait() { + return timeSpentOnTaskWait; + } + + public long getQueueShrunkOps() { + return queueShrunkOps; + } + + public long getWriteCurrentBufferOperations() { + return writeCurrentBufferOperations; + } + + /** + * String to show AbfsOutputStream statistics values in AbfsOutputStream. + * + * @return String with AbfsOutputStream statistics. + */ + @Override public String toString() { + final StringBuilder outputStreamStats = new StringBuilder( + "OutputStream Statistics{"); + outputStreamStats.append(", bytes_upload=").append(bytesToUpload); + outputStreamStats.append(", bytes_upload_successfully=") + .append(bytesUploadSuccessful); + outputStreamStats.append(", bytes_upload_failed=") + .append(bytesUploadFailed); + outputStreamStats.append(", time_spent_task_wait=") + .append(timeSpentOnTaskWait); + outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps); + outputStreamStats.append(", write_current_buffer_ops=") + .append(writeCurrentBufferOperations); + outputStreamStats.append("}"); + return outputStreamStats.toString(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index a42648fc185..4d9fc5cae73 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; @@ -43,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX; @@ -270,6 +273,11 @@ public abstract class AbstractAbfsIntegrationTest extends protected void setFileSystemName(String fileSystemName) { this.fileSystemName = fileSystemName; } + + protected String getMethodName() { + return methodName.getMethodName(); + } + protected String getFileSystemName() { return fileSystemName; } @@ -383,4 +391,22 @@ public abstract class AbstractAbfsIntegrationTest extends throws IOException { return getFileSystem().getDelegationTokenManager(); } + + /** + * Generic create File and enabling AbfsOutputStream Flush. + * + * @param fs AzureBlobFileSystem that is initialised in the test. + * @param path Path of the file to be created. + * @return AbfsOutputStream for writing. + * @throws AzureBlobFileSystemException + */ + protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled( + AzureBlobFileSystem fs, + Path path) throws AzureBlobFileSystemException { + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false); + + return (AbfsOutputStream) abfss.createFile(path, fs.getFsStatistics(), + true, FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java new file mode 100644 index 00000000000..09cbfde1beb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; + +/** + * Test AbfsOutputStream statistics. + */ +public class ITestAbfsOutputStreamStatistics + extends AbstractAbfsIntegrationTest { + private static final int OPERATIONS = 10; + + public ITestAbfsOutputStreamStatistics() throws Exception { + } + + /** + * Tests to check bytes uploaded successfully in {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamUploadingBytes() throws IOException { + describe("Testing bytes uploaded successfully by AbfsOutputSteam"); + final AzureBlobFileSystem fs = getFileSystem(); + Path uploadBytesFilePath = path(getMethodName()); + String testBytesToUpload = "bytes"; + + try ( + AbfsOutputStream outForSomeBytes = createAbfsOutputStreamWithFlushEnabled( + fs, uploadBytesFilePath) + ) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatisticsForUploadBytes = + getAbfsOutputStreamStatistics(outForSomeBytes); + + //Test for zero bytes To upload. + assertEquals("Mismatch in bytes to upload", 0, + abfsOutputStreamStatisticsForUploadBytes.getBytesToUpload()); + + outForSomeBytes.write(testBytesToUpload.getBytes()); + outForSomeBytes.flush(); + abfsOutputStreamStatisticsForUploadBytes = + getAbfsOutputStreamStatistics(outForSomeBytes); + + //Test for bytes to upload. + assertEquals("Mismatch in bytes to upload", + testBytesToUpload.getBytes().length, + abfsOutputStreamStatisticsForUploadBytes.getBytesToUpload()); + + //Test for successful bytes uploaded. + assertEquals("Mismatch in successful bytes uploaded", + testBytesToUpload.getBytes().length, + abfsOutputStreamStatisticsForUploadBytes.getBytesUploadSuccessful()); + + } + + try ( + AbfsOutputStream outForLargeBytes = createAbfsOutputStreamWithFlushEnabled( + fs, uploadBytesFilePath)) { + + for (int i = 0; i < OPERATIONS; i++) { + outForLargeBytes.write(testBytesToUpload.getBytes()); + } + outForLargeBytes.flush(); + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeBytes); + + //Test for bytes to upload. + assertEquals("Mismatch in bytes to upload", + OPERATIONS * (testBytesToUpload.getBytes().length), + abfsOutputStreamStatistics.getBytesToUpload()); + + //Test for successful bytes uploaded. + assertEquals("Mismatch in successful bytes uploaded", + OPERATIONS * (testBytesToUpload.getBytes().length), + abfsOutputStreamStatistics.getBytesUploadSuccessful()); + + } + } + + /** + * Tests to check correct values of queue shrunk operations in + * AbfsOutputStream. + * + * After writing data, AbfsOutputStream doesn't upload the data until + * flushed. Hence, flush() method is called after write() to test queue + * shrink operations. + */ + @Test + public void testAbfsOutputStreamQueueShrink() throws IOException { + describe("Testing queue shrink operations by AbfsOutputStream"); + final AzureBlobFileSystem fs = getFileSystem(); + Path queueShrinkFilePath = path(getMethodName()); + String testQueueShrink = "testQueue"; + + try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( + fs, queueShrinkFilePath)) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForOneOp); + + //Test for shrinking queue zero time. + assertEquals("Mismatch in queue shrunk operations", 0, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + } + + /* + * After writing in the loop we flush inside the loop to ensure the write + * operation done in that loop is considered to be done which would help + * us triggering the shrinkWriteOperationQueue() method each time after + * the write operation. + * If we call flush outside the loop, then it will take all the write + * operations inside the loop as one write operation. + * + */ + try ( + AbfsOutputStream outForLargeOps = createAbfsOutputStreamWithFlushEnabled( + fs, queueShrinkFilePath)) { + for (int i = 0; i < OPERATIONS; i++) { + outForLargeOps.write(testQueueShrink.getBytes()); + outForLargeOps.flush(); + } + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeOps); + /* + * After a write operation is done, it is in a task queue where it is + * removed. Hence, to get the correct expected value we get the size of + * the task queue from AbfsOutputStream and subtract it with total + * write operations done to get the number of queue shrinks done. + * + */ + assertEquals("Mismatch in queue shrunk operations", + OPERATIONS - outForLargeOps.getWriteOperationsSize(), + abfsOutputStreamStatistics.getQueueShrunkOps()); + } + + } + + /** + * Tests to check correct values of write current buffer operations done by + * AbfsOutputStream. + * + * After writing data, AbfsOutputStream doesn't upload data till flush() is + * called. Hence, flush() calls were made after write(). + */ + @Test + public void testAbfsOutputStreamWriteBuffer() throws IOException { + describe("Testing write current buffer operations by AbfsOutputStream"); + final AzureBlobFileSystem fs = getFileSystem(); + Path writeBufferFilePath = path(getMethodName()); + String testWriteBuffer = "Buffer"; + + try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( + fs, writeBufferFilePath)) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForOneOp); + + //Test for zero time writing buffer to service. + assertEquals("Mismatch in write current buffer operations", 0, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + + outForOneOp.write(testWriteBuffer.getBytes()); + outForOneOp.flush(); + + abfsOutputStreamStatistics = getAbfsOutputStreamStatistics(outForOneOp); + + //Test for one time writing buffer to service. + assertEquals("Mismatch in write current buffer operations", 1, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + } + + try ( + AbfsOutputStream outForLargeOps = createAbfsOutputStreamWithFlushEnabled( + fs, writeBufferFilePath)) { + + /* + * Need to flush each time after we write to actually write the data + * into the data store and thus, get the writeCurrentBufferToService() + * method triggered and increment the statistic. + */ + for (int i = 0; i < OPERATIONS; i++) { + outForLargeOps.write(testWriteBuffer.getBytes()); + outForLargeOps.flush(); + } + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeOps); + //Test for 10 times writing buffer to service. + assertEquals("Mismatch in write current buffer operations", + OPERATIONS, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + } + } + + /** + * Method to get the AbfsOutputStream statistics. + * + * @param out AbfsOutputStream whose statistics is needed. + * @return AbfsOutputStream statistics implementation class to get the + * values of the counters. + */ + private static AbfsOutputStreamStatisticsImpl getAbfsOutputStreamStatistics( + AbfsOutputStream out) { + return (AbfsOutputStreamStatisticsImpl) out.getOutputStreamStatistics(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java new file mode 100644 index 00000000000..58f00233710 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.Random; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; + +/** + * Unit tests for AbfsOutputStream statistics. + */ +public class TestAbfsOutputStreamStatistics + extends AbstractAbfsIntegrationTest { + + private static final int LOW_RANGE_FOR_RANDOM_VALUE = 49; + private static final int HIGH_RANGE_FOR_RANDOM_VALUE = 9999; + private static final int OPERATIONS = 10; + + public TestAbfsOutputStreamStatistics() throws Exception { + } + + /** + * Tests to check number of bytes failed to upload in + * {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamBytesFailed() { + describe("Testing number of bytes failed during upload in AbfsOutputSteam"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for zero bytes uploaded. + assertEquals("Mismatch in number of bytes failed to upload", 0, + abfsOutputStreamStatistics.getBytesUploadFailed()); + + //Populating small random value for bytesFailed. + int randomBytesFailed = new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE); + abfsOutputStreamStatistics.uploadFailed(randomBytesFailed); + //Test for bytes failed to upload. + assertEquals("Mismatch in number of bytes failed to upload", + randomBytesFailed, abfsOutputStreamStatistics.getBytesUploadFailed()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering multiple random values for bytesFailed to check correct + * summation of values. + */ + int expectedBytesFailed = 0; + for (int i = 0; i < OPERATIONS; i++) { + randomBytesFailed = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + abfsOutputStreamStatistics.uploadFailed(randomBytesFailed); + expectedBytesFailed += randomBytesFailed; + } + //Test for bytes failed to upload. + assertEquals("Mismatch in number of bytes failed to upload", + expectedBytesFailed, abfsOutputStreamStatistics.getBytesUploadFailed()); + } + + /** + * Tests to check time spent on waiting for tasks to be complete on a + * blocking queue in {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamTimeSpentOnWaitTask() { + describe("Testing time Spent on waiting for task to be completed in " + + "AbfsOutputStream"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for initial value of timeSpentWaitTask. + assertEquals("Mismatch in time spent on waiting for tasks to complete", 0, + abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + + int smallRandomStartTime = + new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE); + int smallRandomEndTime = + new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE) + + smallRandomStartTime; + int smallDiff = smallRandomEndTime - smallRandomStartTime; + abfsOutputStreamStatistics + .timeSpentTaskWait(smallRandomStartTime, smallRandomEndTime); + //Test for small random value of timeSpentWaitTask. + assertEquals("Mismatch in time spent on waiting for tasks to complete", + smallDiff, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering multiple values for timeSpentTaskWait() to check the + * summation is happening correctly. Also calculating the expected result. + */ + int expectedRandomDiff = 0; + for (int i = 0; i < OPERATIONS; i++) { + int largeRandomStartTime = + new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + int largeRandomEndTime = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE) + + largeRandomStartTime; + abfsOutputStreamStatistics + .timeSpentTaskWait(largeRandomStartTime, largeRandomEndTime); + expectedRandomDiff += largeRandomEndTime - largeRandomStartTime; + } + + /* + * Test to check correct value of timeSpentTaskWait after multiple + * random values are passed in it. + */ + assertEquals("Mismatch in time spent on waiting for tasks to complete", + expectedRandomDiff, + abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + } + + /** + * Unit Tests to check correct values of queue shrunk operations in + * AbfsOutputStream. + * + */ + @Test + public void testAbfsOutputStreamQueueShrink() { + describe("Testing queue shrink operations by AbfsOutputStream"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for shrinking queue zero time. + assertEquals("Mismatch in queue shrunk operations", 0, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + abfsOutputStreamStatistics.queueShrunk(); + + //Test for shrinking queue 1 time. + assertEquals("Mismatch in queue shrunk operations", 1, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering random values for queueShrunkOps and checking the correctness + * of summation for the statistic. + */ + int randomQueueValues = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + for (int i = 0; i < randomQueueValues * OPERATIONS; i++) { + abfsOutputStreamStatistics.queueShrunk(); + } + /* + * Test for random times incrementing queue shrunk operations. + */ + assertEquals("Mismatch in queue shrunk operations", + randomQueueValues * OPERATIONS, + abfsOutputStreamStatistics.getQueueShrunkOps()); + } +}