diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 8eda2f37304..4ddc2e396c4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -188,7 +188,7 @@ public class AzureBlobFileSystem extends FileSystem { Path qualifiedPath = makeQualified(f); try { - OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite, + OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite, permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf())); return new FSDataOutputStream(outputStream, statistics); } catch(AzureBlobFileSystemException ex) { @@ -250,7 +250,7 @@ public class AzureBlobFileSystem extends FileSystem { Path qualifiedPath = makeQualified(f); try { - OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false); + OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, statistics, false); return new FSDataOutputStream(outputStream, statistics); } catch(AzureBlobFileSystemException ex) { checkException(f, ex); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index bff0e455cf0..a330da40cbf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -51,6 +51,8 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -97,8 +99,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.client.utils.URIBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH; @@ -391,8 +391,10 @@ public class AzureBlobFileSystemStore implements Closeable { } } - public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission, - final FsPermission umask) throws AzureBlobFileSystemException { + public OutputStream createFile(final Path path, + final FileSystem.Statistics statistics, + final boolean overwrite, final FsPermission permission, + final FsPermission umask) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) { boolean isNamespaceEnabled = getIsNamespaceEnabled(); LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}", @@ -409,12 +411,13 @@ public class AzureBlobFileSystemStore implements Closeable { perfInfo.registerResult(op.getResult()).registerSuccess(true); return new AbfsOutputStream( - client, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - 0, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + client, + statistics, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + 0, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); } } @@ -468,7 +471,7 @@ public class AzureBlobFileSystemStore implements Closeable { } } - public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws + public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}", @@ -495,12 +498,13 @@ public class AzureBlobFileSystemStore implements Closeable { perfInfo.registerSuccess(true); return new AbfsOutputStream( - client, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - offset, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + client, + statistics, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + offset, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 8dc3b8f0707..0c06014a408 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -101,6 +101,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, int currentLen = len; int lastReadBytes; int totalReadBytes = 0; + incrementReadOps(); do { lastReadBytes = readOneBlock(b, currentOff, currentLen); if (lastReadBytes > 0) { @@ -201,6 +202,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, // try reading from buffers first receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); if (receivedBytes > 0) { + incrementReadOps(); return receivedBytes; } @@ -236,6 +238,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); perfInfo.registerResult(op.getResult()).registerSuccess(true); + incrementReadOps(); } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { AbfsRestOperationException ere = (AbfsRestOperationException) ex; @@ -252,6 +255,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, return (int) bytesRead; } + /** + * Increment Read Operations. + */ + private void incrementReadOps() { + if (statistics != null) { + statistics.incrementReadOps(1); + } + } + /** * Seek to given position in stream. * @param n position to seek to diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 7e9746d118c..e943169a03e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -39,6 +39,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; @@ -78,14 +79,18 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final ElasticByteBufferPool byteBufferPool = new ElasticByteBufferPool(); + private final Statistics statistics; + public AbfsOutputStream( final AbfsClient client, + final Statistics statistics, final String path, final long position, final int bufferSize, final boolean supportFlush, final boolean disableOutputStreamFlush) { this.client = client; + this.statistics = statistics; this.path = path; this.position = position; this.closed = false; @@ -181,6 +186,16 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa writableBytes = bufferSize - bufferIndex; } + incrementWriteOps(); + } + + /** + * Increment Write Operations. + */ + private void incrementWriteOps() { + if (statistics != null) { + statistics.incrementWriteOps(1); + } } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java index fee90abeabc..0485422871e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java @@ -17,12 +17,19 @@ */ package org.apache.hadoop.fs.azurebfs; +import java.io.IOException; + import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.rules.TestName; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT; @@ -31,6 +38,9 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST * This class does not attempt to bind to Azure. */ public class AbstractAbfsTestWithTimeout extends Assert { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class); + /** * The name of the current method. */ @@ -67,4 +77,53 @@ public class AbstractAbfsTestWithTimeout extends Assert { protected int getTestTimeoutMillis() { return TEST_TIMEOUT; } + + /** + * Describe a test in the logs. + * + * @param text text to print + * @param args arguments to format in the printing + */ + protected void describe(String text, Object... args) { + LOG.info("\n\n{}: {}\n", + methodName.getMethodName(), + String.format(text, args)); + } + + /** + * Validate Contents written on a file in Abfs. + * + * @param fs AzureBlobFileSystem + * @param path Path of the file + * @param originalByteArray original byte array + * @return if content is validated true else, false + * @throws IOException + */ + protected boolean validateContent(AzureBlobFileSystem fs, Path path, + byte[] originalByteArray) + throws IOException { + int pos = 0; + int lenOfOriginalByteArray = originalByteArray.length; + + try (FSDataInputStream in = fs.open(path)) { + byte valueOfContentAtPos = (byte) in.read(); + + while (valueOfContentAtPos != -1 && pos < lenOfOriginalByteArray) { + if (originalByteArray[pos] != valueOfContentAtPos) { + assertEquals("Mismatch in content validation at position {}", pos, + originalByteArray[pos], valueOfContentAtPos); + return false; + } + valueOfContentAtPos = (byte) in.read(); + pos++; + } + if (valueOfContentAtPos != -1) { + assertEquals("Expected end of file", -1, valueOfContentAtPos); + return false; + } + return true; + } + + } + } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java new file mode 100644 index 00000000000..b749f496bbf --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +/** + * Test Abfs Stream. + */ + +public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest { + public ITestAbfsStreamStatistics() throws Exception { + } + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsStreamStatistics.class); + + private static final int LARGE_NUMBER_OF_OPS = 999999; + + /*** + * Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and + * {@code incrementWriteOps()} in class {@code AbfsOutputStream}. + * + * @throws Exception + */ + @Test + public void testAbfsStreamOps() throws Exception { + describe("Test to see correct population of read and write operations in " + + "Abfs"); + + final AzureBlobFileSystem fs = getFileSystem(); + Path smallOperationsFile = new Path("testOneReadWriteOps"); + Path largeOperationsFile = new Path("testLargeReadWriteOps"); + FileSystem.Statistics statistics = fs.getFsStatistics(); + String testReadWriteOps = "test this"; + statistics.reset(); + + //Test for zero write operation + assertReadWriteOps("write", 0, statistics.getWriteOps()); + + //Test for zero read operation + assertReadWriteOps("read", 0, statistics.getReadOps()); + + FSDataOutputStream outForOneOperation = null; + FSDataInputStream inForOneOperation = null; + try { + outForOneOperation = fs.create(smallOperationsFile); + statistics.reset(); + outForOneOperation.write(testReadWriteOps.getBytes()); + + //Test for a single write operation + assertReadWriteOps("write", 1, statistics.getWriteOps()); + + //Flushing output stream to see content to read + outForOneOperation.hflush(); + inForOneOperation = fs.open(smallOperationsFile); + statistics.reset(); + int result = inForOneOperation.read(testReadWriteOps.getBytes(), 0, + testReadWriteOps.getBytes().length); + + LOG.info("Result of Read operation : {}", result); + /* + Testing if 2 read_ops value is coming after reading full content from a + file (3 if anything to read from Buffer too). + Reason: read() call gives read_ops=1, + reading from AbfsClient(http GET) gives read_ops=2. + */ + assertReadWriteOps("read", 2, statistics.getReadOps()); + + } finally { + IOUtils.cleanupWithLogger(LOG, inForOneOperation, + outForOneOperation); + } + + //Validating if content is being written in the smallOperationsFile + assertTrue("Mismatch in content validation", + validateContent(fs, smallOperationsFile, + testReadWriteOps.getBytes())); + + FSDataOutputStream outForLargeOperations = null; + FSDataInputStream inForLargeOperations = null; + StringBuilder largeOperationsValidationString = new StringBuilder(); + try { + outForLargeOperations = fs.create(largeOperationsFile); + statistics.reset(); + int largeValue = LARGE_NUMBER_OF_OPS; + for (int i = 0; i < largeValue; i++) { + outForLargeOperations.write(testReadWriteOps.getBytes()); + + //Creating the String for content Validation + largeOperationsValidationString.append(testReadWriteOps); + } + LOG.info("Number of bytes of Large data written: {}", + largeOperationsValidationString.toString().getBytes().length); + + //Test for 1000000 write operations + assertReadWriteOps("write", largeValue, statistics.getWriteOps()); + + inForLargeOperations = fs.open(largeOperationsFile); + for (int i = 0; i < largeValue; i++) { + inForLargeOperations + .read(testReadWriteOps.getBytes(), 0, + testReadWriteOps.getBytes().length); + } + + //Test for 1000000 read operations + assertReadWriteOps("read", largeValue, statistics.getReadOps()); + + } finally { + IOUtils.cleanupWithLogger(LOG, inForLargeOperations, + outForLargeOperations); + } + //Validating if content is being written in largeOperationsFile + assertTrue("Mismatch in content validation", + validateContent(fs, largeOperationsFile, + largeOperationsValidationString.toString().getBytes())); + + } + + /** + * Generic method to assert both Read an write operations. + * + * @param operation what operation is being asserted + * @param expectedValue value which is expected + * @param actualValue value which is actual + */ + + private void assertReadWriteOps(String operation, long expectedValue, + long actualValue) { + assertEquals("Mismatch in " + operation + " operations", expectedValue, + actualValue); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java index 533f4712565..e517f685784 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java @@ -22,18 +22,21 @@ import java.io.IOException; import java.io.InputStream; import java.util.Map; -import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; import org.junit.Assume; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.services.AuthType; +import org.apache.hadoop.io.IOUtils; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET; @@ -52,6 +55,8 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{ private static final Path FILE_PATH = new Path("/testFile"); private static final Path EXISTED_FILE_PATH = new Path("/existedFile"); private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder"); + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsStreamStatistics.class); public ITestAzureBlobFileSystemOauth() throws Exception { Assume.assumeTrue(this.getAuthType() == AuthType.OAuth); @@ -143,9 +148,11 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{ // TEST WRITE FILE try { - abfsStore.openFileForWrite(EXISTED_FILE_PATH, true); + abfsStore.openFileForWrite(EXISTED_FILE_PATH, fs.getFsStatistics(), true); } catch (AbfsRestOperationException e) { assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode()); + } finally { + IOUtils.cleanupWithLogger(LOG, abfsStore); } }