HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)

Contributed by Mehakmeet Singh.
This commit is contained in:
Mehakmeet Singh 2020-07-03 16:11:35 +05:30 committed by Thomas Marquardt
parent bbd3278d09
commit 7c9b459786
No known key found for this signature in database
GPG Key ID: AEB30C9E78868287
7 changed files with 732 additions and 0 deletions

View File

@ -87,6 +87,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
@ -512,6 +513,7 @@ public class AzureBlobFileSystemStore implements Closeable {
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.build();
}

View File

@ -68,6 +68,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
// of valid bytes in buffer)
private boolean closed = false;
/** Stream statistics. */
private final AbfsInputStreamStatistics streamStatistics;
public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
@ -86,6 +89,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
this.readAheadEnabled = true;
this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
}
public String getPath() {
@ -105,10 +109,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
// check if buffer is null before logging the length
if (b != null) {
LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
off, len);
} else {
LOG.debug("read requested b = null offset = {} len = {}", off, len);
}
int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
if (streamStatistics != null) {
streamStatistics.readOperationStarted(off, len);
}
incrementReadOps();
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
@ -130,6 +145,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
}
Preconditions.checkNotNull(b);
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
off, len);
if (len == 0) {
return 0;
@ -155,6 +172,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
bCursor = 0;
limit = 0;
if (buffer == null) {
LOG.debug("created new buffer size {}", bufferSize);
buffer = new byte[bufferSize];
}
@ -183,6 +201,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
if (streamStatistics != null) {
// Bytes read from the local buffer.
streamStatistics.bytesReadFromBuffer(bytesToRead);
streamStatistics.bytesRead(bytesToRead);
}
return bytesToRead;
}
@ -200,8 +223,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
@ -211,6 +237,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
incrementReadOps();
LOG.debug("Received data from read ahead, not doing remote read");
return receivedBytes;
}
@ -218,6 +245,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
LOG.debug("read ahead disabled, reading remote");
return readRemote(position, b, offset, length);
}
}
@ -247,6 +275,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) {
streamStatistics.remoteReadOperation();
}
LOG.debug("issuing HTTP GET request params position = {} b.length = {} "
+ "offset = {} length = {}", position, b.length, offset, length);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
incrementReadOps();
} catch (AzureBlobFileSystemException ex) {
@ -262,6 +295,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
LOG.debug("HTTP request read bytes = {}", bytesRead);
return (int) bytesRead;
}
@ -282,6 +316,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
*/
@Override
public synchronized void seek(long n) throws IOException {
LOG.debug("requested seek to position {}", n);
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
@ -292,13 +327,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}
if (streamStatistics != null) {
streamStatistics.seek(n, fCursor);
}
if (n>=fCursor-limit && n<=fCursor) { // within buffer
bCursor = (int) (n-(fCursor-limit));
if (streamStatistics != null) {
streamStatistics.seekInBuffer();
}
return;
}
// next read will read from here
fCursor = n;
LOG.debug("set fCursor to {}", fCursor);
//invalidate buffer
limit = 0;
@ -390,6 +433,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
public synchronized void close() throws IOException {
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
LOG.debug("Closing {}", this);
}
/**
@ -443,4 +487,28 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
this.cachedSasToken = cachedSasToken;
}
/**
* Getter for AbfsInputStreamStatistics.
*
* @return an instance of AbfsInputStreamStatistics.
*/
@VisibleForTesting
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
/**
* Get the statistics of the stream.
* @return a string value.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
if (streamStatistics != null) {
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append(streamStatistics.toString());
sb.append("}");
}
return sb.toString();
}
}

View File

@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
private boolean tolerateOobAppends;
private AbfsInputStreamStatistics streamStatistics;
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@ -52,6 +54,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
return this;
}
public AbfsInputStreamContext withStreamStatistics(
final AbfsInputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
return this;
}
public AbfsInputStreamContext build() {
// Validation of parameters to be done here.
return this;
@ -68,4 +76,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Interface for statistics for the AbfsInputStream.
*/
@InterfaceStability.Unstable
public interface AbfsInputStreamStatistics {
/**
* Seek backwards, incrementing the seek and backward seek counters.
*
* @param negativeOffset how far was the seek?
* This is expected to be negative.
*/
void seekBackwards(long negativeOffset);
/**
* Record a forward seek, adding a seek operation, a forward
* seek operation, and any bytes skipped.
*
* @param skipped number of bytes skipped by reading from the stream.
* If the seek was implemented by a close + reopen, set this to zero.
*/
void seekForwards(long skipped);
/**
* Record a forward or backward seek, adding a seek operation, a forward or
* a backward seek operation, and number of bytes skipped.
*
* @param seekTo seek to the position.
* @param currentPos current position.
*/
void seek(long seekTo, long currentPos);
/**
* Increment the bytes read counter by the number of bytes;
* no-op if the argument is negative.
*
* @param bytes number of bytes read.
*/
void bytesRead(long bytes);
/**
* Record the total bytes read from buffer.
*
* @param bytes number of bytes that are read from buffer.
*/
void bytesReadFromBuffer(long bytes);
/**
* Records the total number of seeks done in the buffer.
*/
void seekInBuffer();
/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read.
* @param len length of bytes to read.
*/
void readOperationStarted(long pos, long len);
/**
* Records a successful remote read operation.
*/
void remoteReadOperation();
/**
* Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics.
*/
@Override
String toString();
}

View File

@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
/**
* Stats for the AbfsInputStream.
*/
public class AbfsInputStreamStatisticsImpl
implements AbfsInputStreamStatistics {
private long seekOperations;
private long forwardSeekOperations;
private long backwardSeekOperations;
private long bytesRead;
private long bytesSkippedOnSeek;
private long bytesBackwardsOnSeek;
private long seekInBuffer;
private long readOperations;
private long bytesReadFromBuffer;
private long remoteReadOperations;
/**
* Seek backwards, incrementing the seek and backward seek counters.
*
* @param negativeOffset how far was the seek?
* This is expected to be negative.
*/
@Override
public void seekBackwards(long negativeOffset) {
seekOperations++;
backwardSeekOperations++;
bytesBackwardsOnSeek -= negativeOffset;
}
/**
* Record a forward seek, adding a seek operation, a forward
* seek operation, and any bytes skipped.
*
* @param skipped number of bytes skipped by reading from the stream.
* If the seek was implemented by a close + reopen, set this to zero.
*/
@Override
public void seekForwards(long skipped) {
seekOperations++;
forwardSeekOperations++;
if (skipped > 0) {
bytesSkippedOnSeek += skipped;
}
}
/**
* Record a forward or backward seek, adding a seek operation, a forward or
* a backward seek operation, and number of bytes skipped.
* The seek direction will be calculated based on the parameters.
*
* @param seekTo seek to the position.
* @param currentPos current position.
*/
@Override
public void seek(long seekTo, long currentPos) {
if (seekTo >= currentPos) {
this.seekForwards(seekTo - currentPos);
} else {
this.seekBackwards(currentPos - seekTo);
}
}
/**
* Increment the bytes read counter by the number of bytes;
* no-op if the argument is negative.
*
* @param bytes number of bytes read.
*/
@Override
public void bytesRead(long bytes) {
if (bytes > 0) {
bytesRead += bytes;
}
}
/**
* {@inheritDoc}
*
* Total bytes read from the buffer.
*
* @param bytes number of bytes that are read from buffer.
*/
@Override
public void bytesReadFromBuffer(long bytes) {
if (bytes > 0) {
bytesReadFromBuffer += bytes;
}
}
/**
* {@inheritDoc}
*
* Increment the number of seeks in the buffer.
*/
@Override
public void seekInBuffer() {
seekInBuffer++;
}
/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read.
* @param len length of bytes to read.
*/
@Override
public void readOperationStarted(long pos, long len) {
readOperations++;
}
/**
* {@inheritDoc}
*
* Increment the counter when a remote read operation occurs.
*/
@Override
public void remoteReadOperation() {
remoteReadOperations++;
}
public long getSeekOperations() {
return seekOperations;
}
public long getForwardSeekOperations() {
return forwardSeekOperations;
}
public long getBackwardSeekOperations() {
return backwardSeekOperations;
}
public long getBytesRead() {
return bytesRead;
}
public long getBytesSkippedOnSeek() {
return bytesSkippedOnSeek;
}
public long getBytesBackwardsOnSeek() {
return bytesBackwardsOnSeek;
}
public long getSeekInBuffer() {
return seekInBuffer;
}
public long getReadOperations() {
return readOperations;
}
public long getBytesReadFromBuffer() {
return bytesReadFromBuffer;
}
public long getRemoteReadOperations() {
return remoteReadOperations;
}
/**
* String operator describes all the current statistics.
* <b>Important: there are no guarantees as to the stability
* of this value.</b>
*
* @return the current values of the stream statistics.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"StreamStatistics{");
sb.append(", SeekOperations=").append(seekOperations);
sb.append(", ForwardSeekOperations=").append(forwardSeekOperations);
sb.append(", BackwardSeekOperations=").append(backwardSeekOperations);
sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
sb.append(", seekInBuffer=").append(seekInBuffer);
sb.append(", BytesRead=").append(bytesRead);
sb.append(", ReadOperations=").append(readOperations);
sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
sb.append(", remoteReadOperations=").append(remoteReadOperations);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.io.IOUtils;
public class ITestAbfsInputStreamStatistics
extends AbstractAbfsIntegrationTest {
private static final int OPERATIONS = 10;
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
private static final int ONE_MB = 1024 * 1024;
private static final int ONE_KB = 1024;
private byte[] defBuffer = new byte[ONE_MB];
public ITestAbfsInputStreamStatistics() throws Exception {
}
/**
* Test to check the initial values of the AbfsInputStream statistics.
*/
@Test
public void testInitValues() throws IOException {
describe("Testing the initial values of AbfsInputStream Statistics");
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
Path initValuesPath = path(getMethodName());
AbfsOutputStream outputStream = null;
AbfsInputStream inputStream = null;
try {
outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath);
inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics());
AbfsInputStreamStatisticsImpl stats =
(AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics();
checkInitValue(stats.getSeekOperations(), "seekOps");
checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps");
checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps");
checkInitValue(stats.getBytesRead(), "bytesRead");
checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek");
checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek");
checkInitValue(stats.getSeekInBuffer(), "seekInBuffer");
checkInitValue(stats.getReadOperations(), "readOps");
checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
} finally {
IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
}
}
/**
* Test to check statistics from seek operation in AbfsInputStream.
*/
@Test
public void testSeekStatistics() throws IOException {
describe("Testing the values of statistics from seek operations in "
+ "AbfsInputStream");
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
Path seekStatPath = path(getMethodName());
AbfsOutputStream out = null;
AbfsInputStream in = null;
try {
out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath);
//Writing a default buffer in a file.
out.write(defBuffer);
out.hflush();
in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics());
/*
* Writing 1MB buffer to the file, this would make the fCursor(Current
* position of cursor) to the end of file.
*/
int result = in.read(defBuffer, 0, ONE_MB);
LOG.info("Result of read : {}", result);
/*
* Seeking to start of file and then back to end would result in a
* backward and a forward seek respectively 10 times.
*/
for (int i = 0; i < OPERATIONS; i++) {
in.seek(0);
in.seek(ONE_MB);
}
AbfsInputStreamStatisticsImpl stats =
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
LOG.info("STATISTICS: {}", stats.toString());
/*
* seekOps - Since we are doing backward and forward seek OPERATIONS
* times, total seeks would be 2 * OPERATIONS.
*
* backwardSeekOps - Since we are doing a backward seek inside a loop
* for OPERATION times, total backward seeks would be OPERATIONS.
*
* forwardSeekOps - Since we are doing a forward seek inside a loop
* for OPERATION times, total forward seeks would be OPERATIONS.
*
* bytesBackwardsOnSeek - Since we are doing backward seeks from end of
* file in a ONE_MB file each time, this would mean the bytes from
* backward seek would be OPERATIONS * ONE_MB. Since this is backward
* seek this value is expected be to be negative.
*
* bytesSkippedOnSeek - Since, we move from start to end in seek, but
* our fCursor(position of cursor) always remain at end of file, this
* would mean no bytes were skipped on seek. Since, all forward seeks
* are in buffer.
*
* seekInBuffer - Since all seeks were in buffer, the seekInBuffer
* would be equal to 2 * OPERATIONS.
*
*/
assertEquals("Mismatch in seekOps value", 2 * OPERATIONS,
stats.getSeekOperations());
assertEquals("Mismatch in backwardSeekOps value", OPERATIONS,
stats.getBackwardSeekOperations());
assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
stats.getForwardSeekOperations());
assertEquals("Mismatch in bytesBackwardsOnSeek value",
-1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
assertEquals("Mismatch in bytesSkippedOnSeek value",
0, stats.getBytesSkippedOnSeek());
assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
stats.getSeekInBuffer());
in.close();
// Verifying whether stats are readable after stream is closed.
LOG.info("STATISTICS after closing: {}", stats.toString());
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
}
/**
* Test to check statistics value from read operation in AbfsInputStream.
*/
@Test
public void testReadStatistics() throws IOException {
describe("Testing the values of statistics from read operation in "
+ "AbfsInputStream");
AzureBlobFileSystem fs = getFileSystem();
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
Path readStatPath = path(getMethodName());
AbfsOutputStream out = null;
AbfsInputStream in = null;
try {
out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath);
/*
* Writing 1MB buffer to the file.
*/
out.write(defBuffer);
out.hflush();
in = abfss.openFileForRead(readStatPath, fs.getFsStatistics());
/*
* Doing file read 10 times.
*/
for (int i = 0; i < OPERATIONS; i++) {
in.read();
}
AbfsInputStreamStatisticsImpl stats =
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
LOG.info("STATISTICS: {}", stats.toString());
/*
* bytesRead - Since each time a single byte is read, total
* bytes read would be equal to OPERATIONS.
*
* readOps - Since each time read operation is performed OPERATIONS
* times, total number of read operations would be equal to OPERATIONS.
*
* remoteReadOps - Only a single remote read operation is done. Hence,
* total remote read ops is 1.
*
*/
assertEquals("Mismatch in bytesRead value", OPERATIONS,
stats.getBytesRead());
assertEquals("Mismatch in readOps value", OPERATIONS,
stats.getReadOperations());
assertEquals("Mismatch in remoteReadOps value", 1,
stats.getRemoteReadOperations());
in.close();
// Verifying if stats are still readable after stream is closed.
LOG.info("STATISTICS after closing: {}", stats.toString());
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
}
/**
* Testing AbfsInputStream works with null Statistics.
*/
@Test
public void testWithNullStreamStatistics() throws IOException {
describe("Testing AbfsInputStream operations with statistics as null");
AzureBlobFileSystem fs = getFileSystem();
Path nullStatFilePath = path(getMethodName());
byte[] oneKbBuff = new byte[ONE_KB];
// Creating an AbfsInputStreamContext instance with null StreamStatistics.
AbfsInputStreamContext abfsInputStreamContext =
new AbfsInputStreamContext(
getConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
.withReadBufferSize(getConfiguration().getReadBufferSize())
.withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth())
.withStreamStatistics(null)
.build();
AbfsOutputStream out = null;
AbfsInputStream in = null;
try {
out = createAbfsOutputStreamWithFlushEnabled(fs, nullStatFilePath);
// Writing a 1KB buffer in the file.
out.write(oneKbBuff);
out.hflush();
// AbfsRestOperation Instance required for eTag.
AbfsRestOperation abfsRestOperation =
fs.getAbfsClient().getPathStatus(nullStatFilePath.toUri().getPath(), false);
// AbfsInputStream with no StreamStatistics.
in = new AbfsInputStream(fs.getAbfsClient(), null,
nullStatFilePath.toUri().getPath(), ONE_KB,
abfsInputStreamContext,
abfsRestOperation.getResult().getResponseHeader("ETag"));
// Verifying that AbfsInputStream Operations works with null statistics.
assertNotEquals("AbfsInputStream read() with null statistics should "
+ "work", -1, in.read());
in.seek(ONE_KB);
// Verifying toString() with no StreamStatistics.
LOG.info("AbfsInputStream: {}", in.toString());
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
}
/**
* Method to assert the initial values of the statistics.
*
* @param actualValue the actual value of the statistics.
* @param statistic the name of operation or statistic being asserted.
*/
private void checkInitValue(long actualValue, String statistic) {
assertEquals("Mismatch in " + statistic + " value", 0, actualValue);
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
public class TestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest {
private static final int OPERATIONS = 100;
public TestAbfsInputStreamStatistics() throws Exception {
}
/**
* Test to check the bytesReadFromBuffer statistic value from AbfsInputStream.
*/
@Test
public void testBytesReadFromBufferStatistic() {
describe("Testing bytesReadFromBuffer statistics value in AbfsInputStream");
AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
new AbfsInputStreamStatisticsImpl();
//Increment the bytesReadFromBuffer value.
for (int i = 0; i < OPERATIONS; i++) {
abfsInputStreamStatistics.bytesReadFromBuffer(1);
}
/*
* Since we incremented the bytesReadFromBuffer OPERATIONS times, this
* should be the expected value.
*/
assertEquals("Mismatch in bytesReadFromBuffer value", OPERATIONS,
abfsInputStreamStatistics.getBytesReadFromBuffer());
}
}