HADOOP-16910 : ABFS Streams to update FileSystem.Statistics counters on IO. (#1918). Contributed by Mehakmeet Singh.

This commit is contained in:
Mehakmeet Singh 2020-03-31 18:19:09 +05:30 committed by Steve Loughran
parent 54a64e542e
commit f74a571fdf
7 changed files with 275 additions and 21 deletions

View File

@ -188,7 +188,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
Path qualifiedPath = makeQualified(f);
try {
OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) {
@ -250,7 +250,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
Path qualifiedPath = makeQualified(f);
try {
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, statistics, false);
return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) {
checkException(f, ex);

View File

@ -51,6 +51,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -97,8 +99,6 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_FORWARD_SLASH;
@ -391,8 +391,10 @@ public void deleteFilesystem() throws AzureBlobFileSystemException {
}
}
public OutputStream createFile(final Path path, final boolean overwrite, final FsPermission permission,
final FsPermission umask) throws AzureBlobFileSystemException {
public OutputStream createFile(final Path path,
final FileSystem.Statistics statistics,
final boolean overwrite, final FsPermission permission,
final FsPermission umask) throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
boolean isNamespaceEnabled = getIsNamespaceEnabled();
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@ -409,12 +411,13 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
perfInfo.registerResult(op.getResult()).registerSuccess(true);
return new AbfsOutputStream(
client,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
0,
abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
client,
statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
0,
abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
}
}
@ -468,7 +471,7 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
}
}
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
@ -495,12 +498,13 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
perfInfo.registerSuccess(true);
return new AbfsOutputStream(
client,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset,
abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
client,
statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset,
abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
}
}

View File

@ -101,6 +101,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
incrementReadOps();
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
if (lastReadBytes > 0) {
@ -201,6 +202,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
incrementReadOps();
return receivedBytes;
}
@ -236,6 +238,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
incrementReadOps();
} catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) {
AbfsRestOperationException ere = (AbfsRestOperationException) ex;
@ -252,6 +255,15 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
return (int) bytesRead;
}
/**
* Increment Read Operations.
*/
private void incrementReadOps() {
if (statistics != null) {
statistics.incrementReadOps(1);
}
}
/**
* Seek to given position in stream.
* @param n position to seek to

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
@ -78,14 +79,18 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final ElasticByteBufferPool byteBufferPool
= new ElasticByteBufferPool();
private final Statistics statistics;
public AbfsOutputStream(
final AbfsClient client,
final Statistics statistics,
final String path,
final long position,
final int bufferSize,
final boolean supportFlush,
final boolean disableOutputStreamFlush) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.position = position;
this.closed = false;
@ -181,6 +186,16 @@ public synchronized void write(final byte[] data, final int off, final int lengt
writableBytes = bufferSize - bufferIndex;
}
incrementWriteOps();
}
/**
* Increment Write Operations.
*/
private void incrementWriteOps() {
if (statistics != null) {
statistics.incrementWriteOps(1);
}
}
/**

View File

@ -17,12 +17,19 @@
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT;
@ -31,6 +38,9 @@
* This class does not attempt to bind to Azure.
*/
public class AbstractAbfsTestWithTimeout extends Assert {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class);
/**
* The name of the current method.
*/
@ -67,4 +77,53 @@ public void nameThread() {
protected int getTestTimeoutMillis() {
return TEST_TIMEOUT;
}
/**
* Describe a test in the logs.
*
* @param text text to print
* @param args arguments to format in the printing
*/
protected void describe(String text, Object... args) {
LOG.info("\n\n{}: {}\n",
methodName.getMethodName(),
String.format(text, args));
}
/**
* Validate Contents written on a file in Abfs.
*
* @param fs AzureBlobFileSystem
* @param path Path of the file
* @param originalByteArray original byte array
* @return if content is validated true else, false
* @throws IOException
*/
protected boolean validateContent(AzureBlobFileSystem fs, Path path,
byte[] originalByteArray)
throws IOException {
int pos = 0;
int lenOfOriginalByteArray = originalByteArray.length;
try (FSDataInputStream in = fs.open(path)) {
byte valueOfContentAtPos = (byte) in.read();
while (valueOfContentAtPos != -1 && pos < lenOfOriginalByteArray) {
if (originalByteArray[pos] != valueOfContentAtPos) {
assertEquals("Mismatch in content validation at position {}", pos,
originalByteArray[pos], valueOfContentAtPos);
return false;
}
valueOfContentAtPos = (byte) in.read();
pos++;
}
if (valueOfContentAtPos != -1) {
assertEquals("Expected end of file", -1, valueOfContentAtPos);
return false;
}
return true;
}
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* Test Abfs Stream.
*/
public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
public ITestAbfsStreamStatistics() throws Exception {
}
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
private static final int LARGE_NUMBER_OF_OPS = 999999;
/***
* Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and
* {@code incrementWriteOps()} in class {@code AbfsOutputStream}.
*
* @throws Exception
*/
@Test
public void testAbfsStreamOps() throws Exception {
describe("Test to see correct population of read and write operations in "
+ "Abfs");
final AzureBlobFileSystem fs = getFileSystem();
Path smallOperationsFile = new Path("testOneReadWriteOps");
Path largeOperationsFile = new Path("testLargeReadWriteOps");
FileSystem.Statistics statistics = fs.getFsStatistics();
String testReadWriteOps = "test this";
statistics.reset();
//Test for zero write operation
assertReadWriteOps("write", 0, statistics.getWriteOps());
//Test for zero read operation
assertReadWriteOps("read", 0, statistics.getReadOps());
FSDataOutputStream outForOneOperation = null;
FSDataInputStream inForOneOperation = null;
try {
outForOneOperation = fs.create(smallOperationsFile);
statistics.reset();
outForOneOperation.write(testReadWriteOps.getBytes());
//Test for a single write operation
assertReadWriteOps("write", 1, statistics.getWriteOps());
//Flushing output stream to see content to read
outForOneOperation.hflush();
inForOneOperation = fs.open(smallOperationsFile);
statistics.reset();
int result = inForOneOperation.read(testReadWriteOps.getBytes(), 0,
testReadWriteOps.getBytes().length);
LOG.info("Result of Read operation : {}", result);
/*
Testing if 2 read_ops value is coming after reading full content from a
file (3 if anything to read from Buffer too).
Reason: read() call gives read_ops=1,
reading from AbfsClient(http GET) gives read_ops=2.
*/
assertReadWriteOps("read", 2, statistics.getReadOps());
} finally {
IOUtils.cleanupWithLogger(LOG, inForOneOperation,
outForOneOperation);
}
//Validating if content is being written in the smallOperationsFile
assertTrue("Mismatch in content validation",
validateContent(fs, smallOperationsFile,
testReadWriteOps.getBytes()));
FSDataOutputStream outForLargeOperations = null;
FSDataInputStream inForLargeOperations = null;
StringBuilder largeOperationsValidationString = new StringBuilder();
try {
outForLargeOperations = fs.create(largeOperationsFile);
statistics.reset();
int largeValue = LARGE_NUMBER_OF_OPS;
for (int i = 0; i < largeValue; i++) {
outForLargeOperations.write(testReadWriteOps.getBytes());
//Creating the String for content Validation
largeOperationsValidationString.append(testReadWriteOps);
}
LOG.info("Number of bytes of Large data written: {}",
largeOperationsValidationString.toString().getBytes().length);
//Test for 1000000 write operations
assertReadWriteOps("write", largeValue, statistics.getWriteOps());
inForLargeOperations = fs.open(largeOperationsFile);
for (int i = 0; i < largeValue; i++) {
inForLargeOperations
.read(testReadWriteOps.getBytes(), 0,
testReadWriteOps.getBytes().length);
}
//Test for 1000000 read operations
assertReadWriteOps("read", largeValue, statistics.getReadOps());
} finally {
IOUtils.cleanupWithLogger(LOG, inForLargeOperations,
outForLargeOperations);
}
//Validating if content is being written in largeOperationsFile
assertTrue("Mismatch in content validation",
validateContent(fs, largeOperationsFile,
largeOperationsValidationString.toString().getBytes()));
}
/**
* Generic method to assert both Read an write operations.
*
* @param operation what operation is being asserted
* @param expectedValue value which is expected
* @param actualValue value which is actual
*/
private void assertReadWriteOps(String operation, long expectedValue,
long actualValue) {
assertEquals("Mismatch in " + operation + " operations", expectedValue,
actualValue);
}
}

View File

@ -22,18 +22,21 @@
import java.io.InputStream;
import java.util.Map;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET;
@ -52,6 +55,8 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
private static final Path FILE_PATH = new Path("/testFile");
private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
public ITestAzureBlobFileSystemOauth() throws Exception {
Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
@ -143,9 +148,11 @@ public void testBlobDataReader() throws Exception {
// TEST WRITE FILE
try {
abfsStore.openFileForWrite(EXISTED_FILE_PATH, true);
abfsStore.openFileForWrite(EXISTED_FILE_PATH, fs.getFsStatistics(), true);
} catch (AbfsRestOperationException e) {
assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode());
} finally {
IOUtils.cleanupWithLogger(LOG, abfsStore);
}
}