HADOOP-16961. ABFS: Adding metrics to AbfsInputStream (#2076)
Contributed by Mehakmeet Singh.
This commit is contained in:
parent
04abd0eb17
commit
3b5c9a90c0
|
@ -86,6 +86,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
|||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
|
||||
|
@ -511,6 +512,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
|
||||
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
|
||||
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
|
||||
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -68,6 +68,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
// of valid bytes in buffer)
|
||||
private boolean closed = false;
|
||||
|
||||
/** Stream statistics. */
|
||||
private final AbfsInputStreamStatistics streamStatistics;
|
||||
|
||||
public AbfsInputStream(
|
||||
final AbfsClient client,
|
||||
final Statistics statistics,
|
||||
|
@ -86,6 +89,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
this.readAheadEnabled = true;
|
||||
this.cachedSasToken = new CachedSASToken(
|
||||
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
||||
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
|
||||
}
|
||||
|
||||
public String getPath() {
|
||||
|
@ -105,10 +109,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
|
||||
@Override
|
||||
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
|
||||
// check if buffer is null before logging the length
|
||||
if (b != null) {
|
||||
LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
|
||||
off, len);
|
||||
} else {
|
||||
LOG.debug("read requested b = null offset = {} len = {}", off, len);
|
||||
}
|
||||
|
||||
int currentOff = off;
|
||||
int currentLen = len;
|
||||
int lastReadBytes;
|
||||
int totalReadBytes = 0;
|
||||
if (streamStatistics != null) {
|
||||
streamStatistics.readOperationStarted(off, len);
|
||||
}
|
||||
incrementReadOps();
|
||||
do {
|
||||
lastReadBytes = readOneBlock(b, currentOff, currentLen);
|
||||
|
@ -130,6 +145,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
}
|
||||
|
||||
Preconditions.checkNotNull(b);
|
||||
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
|
||||
off, len);
|
||||
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
|
@ -155,6 +172,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
bCursor = 0;
|
||||
limit = 0;
|
||||
if (buffer == null) {
|
||||
LOG.debug("created new buffer size {}", bufferSize);
|
||||
buffer = new byte[bufferSize];
|
||||
}
|
||||
|
||||
|
@ -183,6 +201,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
if (statistics != null) {
|
||||
statistics.incrementBytesRead(bytesToRead);
|
||||
}
|
||||
if (streamStatistics != null) {
|
||||
// Bytes read from the local buffer.
|
||||
streamStatistics.bytesReadFromBuffer(bytesToRead);
|
||||
streamStatistics.bytesRead(bytesToRead);
|
||||
}
|
||||
return bytesToRead;
|
||||
}
|
||||
|
||||
|
@ -200,8 +223,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
int numReadAheads = this.readAheadQueueDepth;
|
||||
long nextSize;
|
||||
long nextOffset = position;
|
||||
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
|
||||
while (numReadAheads > 0 && nextOffset < contentLength) {
|
||||
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
|
||||
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
|
||||
nextOffset, nextSize);
|
||||
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
|
||||
nextOffset = nextOffset + nextSize;
|
||||
numReadAheads--;
|
||||
|
@ -211,6 +237,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
|
||||
if (receivedBytes > 0) {
|
||||
incrementReadOps();
|
||||
LOG.debug("Received data from read ahead, not doing remote read");
|
||||
return receivedBytes;
|
||||
}
|
||||
|
||||
|
@ -218,6 +245,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
receivedBytes = readRemote(position, b, offset, length);
|
||||
return receivedBytes;
|
||||
} else {
|
||||
LOG.debug("read ahead disabled, reading remote");
|
||||
return readRemote(position, b, offset, length);
|
||||
}
|
||||
}
|
||||
|
@ -247,6 +275,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
|
||||
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
|
||||
cachedSasToken.update(op.getSasToken());
|
||||
if (streamStatistics != null) {
|
||||
streamStatistics.remoteReadOperation();
|
||||
}
|
||||
LOG.debug("issuing HTTP GET request params position = {} b.length = {} "
|
||||
+ "offset = {} length = {}", position, b.length, offset, length);
|
||||
perfInfo.registerResult(op.getResult()).registerSuccess(true);
|
||||
incrementReadOps();
|
||||
} catch (AzureBlobFileSystemException ex) {
|
||||
|
@ -262,6 +295,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
if (bytesRead > Integer.MAX_VALUE) {
|
||||
throw new IOException("Unexpected Content-Length");
|
||||
}
|
||||
LOG.debug("HTTP request read bytes = {}", bytesRead);
|
||||
return (int) bytesRead;
|
||||
}
|
||||
|
||||
|
@ -282,6 +316,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
*/
|
||||
@Override
|
||||
public synchronized void seek(long n) throws IOException {
|
||||
LOG.debug("requested seek to position {}", n);
|
||||
if (closed) {
|
||||
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
||||
}
|
||||
|
@ -292,13 +327,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
|
||||
}
|
||||
|
||||
if (streamStatistics != null) {
|
||||
streamStatistics.seek(n, fCursor);
|
||||
}
|
||||
|
||||
if (n>=fCursor-limit && n<=fCursor) { // within buffer
|
||||
bCursor = (int) (n-(fCursor-limit));
|
||||
if (streamStatistics != null) {
|
||||
streamStatistics.seekInBuffer();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// next read will read from here
|
||||
fCursor = n;
|
||||
LOG.debug("set fCursor to {}", fCursor);
|
||||
|
||||
//invalidate buffer
|
||||
limit = 0;
|
||||
|
@ -390,6 +433,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
public synchronized void close() throws IOException {
|
||||
closed = true;
|
||||
buffer = null; // de-reference the buffer so it can be GC'ed sooner
|
||||
LOG.debug("Closing {}", this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -443,4 +487,28 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
this.cachedSasToken = cachedSasToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter for AbfsInputStreamStatistics.
|
||||
*
|
||||
* @return an instance of AbfsInputStreamStatistics.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public AbfsInputStreamStatistics getStreamStatistics() {
|
||||
return streamStatistics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the statistics of the stream.
|
||||
* @return a string value.
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(super.toString());
|
||||
if (streamStatistics != null) {
|
||||
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
|
||||
sb.append(streamStatistics.toString());
|
||||
sb.append("}");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
|||
|
||||
private boolean tolerateOobAppends;
|
||||
|
||||
private AbfsInputStreamStatistics streamStatistics;
|
||||
|
||||
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
|
||||
super(sasTokenRenewPeriodForStreamsInSeconds);
|
||||
}
|
||||
|
@ -52,6 +54,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
|||
return this;
|
||||
}
|
||||
|
||||
public AbfsInputStreamContext withStreamStatistics(
|
||||
final AbfsInputStreamStatistics streamStatistics) {
|
||||
this.streamStatistics = streamStatistics;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsInputStreamContext build() {
|
||||
// Validation of parameters to be done here.
|
||||
return this;
|
||||
|
@ -68,4 +76,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
|||
public boolean isTolerateOobAppends() {
|
||||
return tolerateOobAppends;
|
||||
}
|
||||
|
||||
public AbfsInputStreamStatistics getStreamStatistics() {
|
||||
return streamStatistics;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Interface for statistics for the AbfsInputStream.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public interface AbfsInputStreamStatistics {
|
||||
/**
|
||||
* Seek backwards, incrementing the seek and backward seek counters.
|
||||
*
|
||||
* @param negativeOffset how far was the seek?
|
||||
* This is expected to be negative.
|
||||
*/
|
||||
void seekBackwards(long negativeOffset);
|
||||
|
||||
/**
|
||||
* Record a forward seek, adding a seek operation, a forward
|
||||
* seek operation, and any bytes skipped.
|
||||
*
|
||||
* @param skipped number of bytes skipped by reading from the stream.
|
||||
* If the seek was implemented by a close + reopen, set this to zero.
|
||||
*/
|
||||
void seekForwards(long skipped);
|
||||
|
||||
/**
|
||||
* Record a forward or backward seek, adding a seek operation, a forward or
|
||||
* a backward seek operation, and number of bytes skipped.
|
||||
*
|
||||
* @param seekTo seek to the position.
|
||||
* @param currentPos current position.
|
||||
*/
|
||||
void seek(long seekTo, long currentPos);
|
||||
|
||||
/**
|
||||
* Increment the bytes read counter by the number of bytes;
|
||||
* no-op if the argument is negative.
|
||||
*
|
||||
* @param bytes number of bytes read.
|
||||
*/
|
||||
void bytesRead(long bytes);
|
||||
|
||||
/**
|
||||
* Record the total bytes read from buffer.
|
||||
*
|
||||
* @param bytes number of bytes that are read from buffer.
|
||||
*/
|
||||
void bytesReadFromBuffer(long bytes);
|
||||
|
||||
/**
|
||||
* Records the total number of seeks done in the buffer.
|
||||
*/
|
||||
void seekInBuffer();
|
||||
|
||||
/**
|
||||
* A {@code read(byte[] buf, int off, int len)} operation has started.
|
||||
*
|
||||
* @param pos starting position of the read.
|
||||
* @param len length of bytes to read.
|
||||
*/
|
||||
void readOperationStarted(long pos, long len);
|
||||
|
||||
/**
|
||||
* Records a successful remote read operation.
|
||||
*/
|
||||
void remoteReadOperation();
|
||||
|
||||
/**
|
||||
* Makes the string of all the AbfsInputStream statistics.
|
||||
* @return the string with all the statistics.
|
||||
*/
|
||||
@Override
|
||||
String toString();
|
||||
}
|
|
@ -0,0 +1,205 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
/**
|
||||
* Stats for the AbfsInputStream.
|
||||
*/
|
||||
public class AbfsInputStreamStatisticsImpl
|
||||
implements AbfsInputStreamStatistics {
|
||||
private long seekOperations;
|
||||
private long forwardSeekOperations;
|
||||
private long backwardSeekOperations;
|
||||
private long bytesRead;
|
||||
private long bytesSkippedOnSeek;
|
||||
private long bytesBackwardsOnSeek;
|
||||
private long seekInBuffer;
|
||||
private long readOperations;
|
||||
private long bytesReadFromBuffer;
|
||||
private long remoteReadOperations;
|
||||
|
||||
/**
|
||||
* Seek backwards, incrementing the seek and backward seek counters.
|
||||
*
|
||||
* @param negativeOffset how far was the seek?
|
||||
* This is expected to be negative.
|
||||
*/
|
||||
@Override
|
||||
public void seekBackwards(long negativeOffset) {
|
||||
seekOperations++;
|
||||
backwardSeekOperations++;
|
||||
bytesBackwardsOnSeek -= negativeOffset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a forward seek, adding a seek operation, a forward
|
||||
* seek operation, and any bytes skipped.
|
||||
*
|
||||
* @param skipped number of bytes skipped by reading from the stream.
|
||||
* If the seek was implemented by a close + reopen, set this to zero.
|
||||
*/
|
||||
@Override
|
||||
public void seekForwards(long skipped) {
|
||||
seekOperations++;
|
||||
forwardSeekOperations++;
|
||||
if (skipped > 0) {
|
||||
bytesSkippedOnSeek += skipped;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Record a forward or backward seek, adding a seek operation, a forward or
|
||||
* a backward seek operation, and number of bytes skipped.
|
||||
* The seek direction will be calculated based on the parameters.
|
||||
*
|
||||
* @param seekTo seek to the position.
|
||||
* @param currentPos current position.
|
||||
*/
|
||||
@Override
|
||||
public void seek(long seekTo, long currentPos) {
|
||||
if (seekTo >= currentPos) {
|
||||
this.seekForwards(seekTo - currentPos);
|
||||
} else {
|
||||
this.seekBackwards(currentPos - seekTo);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the bytes read counter by the number of bytes;
|
||||
* no-op if the argument is negative.
|
||||
*
|
||||
* @param bytes number of bytes read.
|
||||
*/
|
||||
@Override
|
||||
public void bytesRead(long bytes) {
|
||||
if (bytes > 0) {
|
||||
bytesRead += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
* Total bytes read from the buffer.
|
||||
*
|
||||
* @param bytes number of bytes that are read from buffer.
|
||||
*/
|
||||
@Override
|
||||
public void bytesReadFromBuffer(long bytes) {
|
||||
if (bytes > 0) {
|
||||
bytesReadFromBuffer += bytes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
* Increment the number of seeks in the buffer.
|
||||
*/
|
||||
@Override
|
||||
public void seekInBuffer() {
|
||||
seekInBuffer++;
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@code read(byte[] buf, int off, int len)} operation has started.
|
||||
*
|
||||
* @param pos starting position of the read.
|
||||
* @param len length of bytes to read.
|
||||
*/
|
||||
@Override
|
||||
public void readOperationStarted(long pos, long len) {
|
||||
readOperations++;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
* Increment the counter when a remote read operation occurs.
|
||||
*/
|
||||
@Override
|
||||
public void remoteReadOperation() {
|
||||
remoteReadOperations++;
|
||||
}
|
||||
|
||||
public long getSeekOperations() {
|
||||
return seekOperations;
|
||||
}
|
||||
|
||||
public long getForwardSeekOperations() {
|
||||
return forwardSeekOperations;
|
||||
}
|
||||
|
||||
public long getBackwardSeekOperations() {
|
||||
return backwardSeekOperations;
|
||||
}
|
||||
|
||||
public long getBytesRead() {
|
||||
return bytesRead;
|
||||
}
|
||||
|
||||
public long getBytesSkippedOnSeek() {
|
||||
return bytesSkippedOnSeek;
|
||||
}
|
||||
|
||||
public long getBytesBackwardsOnSeek() {
|
||||
return bytesBackwardsOnSeek;
|
||||
}
|
||||
|
||||
public long getSeekInBuffer() {
|
||||
return seekInBuffer;
|
||||
}
|
||||
|
||||
public long getReadOperations() {
|
||||
return readOperations;
|
||||
}
|
||||
|
||||
public long getBytesReadFromBuffer() {
|
||||
return bytesReadFromBuffer;
|
||||
}
|
||||
|
||||
public long getRemoteReadOperations() {
|
||||
return remoteReadOperations;
|
||||
}
|
||||
|
||||
/**
|
||||
* String operator describes all the current statistics.
|
||||
* <b>Important: there are no guarantees as to the stability
|
||||
* of this value.</b>
|
||||
*
|
||||
* @return the current values of the stream statistics.
|
||||
*/
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
"StreamStatistics{");
|
||||
sb.append(", SeekOperations=").append(seekOperations);
|
||||
sb.append(", ForwardSeekOperations=").append(forwardSeekOperations);
|
||||
sb.append(", BackwardSeekOperations=").append(backwardSeekOperations);
|
||||
sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
|
||||
sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
|
||||
sb.append(", seekInBuffer=").append(seekInBuffer);
|
||||
sb.append(", BytesRead=").append(bytesRead);
|
||||
sb.append(", ReadOperations=").append(readOperations);
|
||||
sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer);
|
||||
sb.append(", remoteReadOperations=").append(remoteReadOperations);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,297 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
public class ITestAbfsInputStreamStatistics
|
||||
extends AbstractAbfsIntegrationTest {
|
||||
private static final int OPERATIONS = 10;
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
|
||||
private static final int ONE_MB = 1024 * 1024;
|
||||
private static final int ONE_KB = 1024;
|
||||
private byte[] defBuffer = new byte[ONE_MB];
|
||||
|
||||
public ITestAbfsInputStreamStatistics() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check the initial values of the AbfsInputStream statistics.
|
||||
*/
|
||||
@Test
|
||||
public void testInitValues() throws IOException {
|
||||
describe("Testing the initial values of AbfsInputStream Statistics");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
|
||||
Path initValuesPath = path(getMethodName());
|
||||
AbfsOutputStream outputStream = null;
|
||||
AbfsInputStream inputStream = null;
|
||||
|
||||
try {
|
||||
|
||||
outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath);
|
||||
inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics());
|
||||
|
||||
AbfsInputStreamStatisticsImpl stats =
|
||||
(AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics();
|
||||
|
||||
checkInitValue(stats.getSeekOperations(), "seekOps");
|
||||
checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps");
|
||||
checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps");
|
||||
checkInitValue(stats.getBytesRead(), "bytesRead");
|
||||
checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek");
|
||||
checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek");
|
||||
checkInitValue(stats.getSeekInBuffer(), "seekInBuffer");
|
||||
checkInitValue(stats.getReadOperations(), "readOps");
|
||||
checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
|
||||
checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
|
||||
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check statistics from seek operation in AbfsInputStream.
|
||||
*/
|
||||
@Test
|
||||
public void testSeekStatistics() throws IOException {
|
||||
describe("Testing the values of statistics from seek operations in "
|
||||
+ "AbfsInputStream");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
|
||||
Path seekStatPath = path(getMethodName());
|
||||
|
||||
AbfsOutputStream out = null;
|
||||
AbfsInputStream in = null;
|
||||
|
||||
try {
|
||||
out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath);
|
||||
|
||||
//Writing a default buffer in a file.
|
||||
out.write(defBuffer);
|
||||
out.hflush();
|
||||
in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics());
|
||||
|
||||
/*
|
||||
* Writing 1MB buffer to the file, this would make the fCursor(Current
|
||||
* position of cursor) to the end of file.
|
||||
*/
|
||||
int result = in.read(defBuffer, 0, ONE_MB);
|
||||
LOG.info("Result of read : {}", result);
|
||||
|
||||
/*
|
||||
* Seeking to start of file and then back to end would result in a
|
||||
* backward and a forward seek respectively 10 times.
|
||||
*/
|
||||
for (int i = 0; i < OPERATIONS; i++) {
|
||||
in.seek(0);
|
||||
in.seek(ONE_MB);
|
||||
}
|
||||
|
||||
AbfsInputStreamStatisticsImpl stats =
|
||||
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
|
||||
|
||||
LOG.info("STATISTICS: {}", stats.toString());
|
||||
|
||||
/*
|
||||
* seekOps - Since we are doing backward and forward seek OPERATIONS
|
||||
* times, total seeks would be 2 * OPERATIONS.
|
||||
*
|
||||
* backwardSeekOps - Since we are doing a backward seek inside a loop
|
||||
* for OPERATION times, total backward seeks would be OPERATIONS.
|
||||
*
|
||||
* forwardSeekOps - Since we are doing a forward seek inside a loop
|
||||
* for OPERATION times, total forward seeks would be OPERATIONS.
|
||||
*
|
||||
* bytesBackwardsOnSeek - Since we are doing backward seeks from end of
|
||||
* file in a ONE_MB file each time, this would mean the bytes from
|
||||
* backward seek would be OPERATIONS * ONE_MB. Since this is backward
|
||||
* seek this value is expected be to be negative.
|
||||
*
|
||||
* bytesSkippedOnSeek - Since, we move from start to end in seek, but
|
||||
* our fCursor(position of cursor) always remain at end of file, this
|
||||
* would mean no bytes were skipped on seek. Since, all forward seeks
|
||||
* are in buffer.
|
||||
*
|
||||
* seekInBuffer - Since all seeks were in buffer, the seekInBuffer
|
||||
* would be equal to 2 * OPERATIONS.
|
||||
*
|
||||
*/
|
||||
assertEquals("Mismatch in seekOps value", 2 * OPERATIONS,
|
||||
stats.getSeekOperations());
|
||||
assertEquals("Mismatch in backwardSeekOps value", OPERATIONS,
|
||||
stats.getBackwardSeekOperations());
|
||||
assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
|
||||
stats.getForwardSeekOperations());
|
||||
assertEquals("Mismatch in bytesBackwardsOnSeek value",
|
||||
-1 * OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
|
||||
assertEquals("Mismatch in bytesSkippedOnSeek value",
|
||||
0, stats.getBytesSkippedOnSeek());
|
||||
assertEquals("Mismatch in seekInBuffer value", 2 * OPERATIONS,
|
||||
stats.getSeekInBuffer());
|
||||
|
||||
in.close();
|
||||
// Verifying whether stats are readable after stream is closed.
|
||||
LOG.info("STATISTICS after closing: {}", stats.toString());
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check statistics value from read operation in AbfsInputStream.
|
||||
*/
|
||||
@Test
|
||||
public void testReadStatistics() throws IOException {
|
||||
describe("Testing the values of statistics from read operation in "
|
||||
+ "AbfsInputStream");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
|
||||
Path readStatPath = path(getMethodName());
|
||||
|
||||
AbfsOutputStream out = null;
|
||||
AbfsInputStream in = null;
|
||||
|
||||
try {
|
||||
out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath);
|
||||
|
||||
/*
|
||||
* Writing 1MB buffer to the file.
|
||||
*/
|
||||
out.write(defBuffer);
|
||||
out.hflush();
|
||||
in = abfss.openFileForRead(readStatPath, fs.getFsStatistics());
|
||||
|
||||
/*
|
||||
* Doing file read 10 times.
|
||||
*/
|
||||
for (int i = 0; i < OPERATIONS; i++) {
|
||||
in.read();
|
||||
}
|
||||
|
||||
AbfsInputStreamStatisticsImpl stats =
|
||||
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
|
||||
|
||||
LOG.info("STATISTICS: {}", stats.toString());
|
||||
|
||||
/*
|
||||
* bytesRead - Since each time a single byte is read, total
|
||||
* bytes read would be equal to OPERATIONS.
|
||||
*
|
||||
* readOps - Since each time read operation is performed OPERATIONS
|
||||
* times, total number of read operations would be equal to OPERATIONS.
|
||||
*
|
||||
* remoteReadOps - Only a single remote read operation is done. Hence,
|
||||
* total remote read ops is 1.
|
||||
*
|
||||
*/
|
||||
assertEquals("Mismatch in bytesRead value", OPERATIONS,
|
||||
stats.getBytesRead());
|
||||
assertEquals("Mismatch in readOps value", OPERATIONS,
|
||||
stats.getReadOperations());
|
||||
assertEquals("Mismatch in remoteReadOps value", 1,
|
||||
stats.getRemoteReadOperations());
|
||||
|
||||
in.close();
|
||||
// Verifying if stats are still readable after stream is closed.
|
||||
LOG.info("STATISTICS after closing: {}", stats.toString());
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing AbfsInputStream works with null Statistics.
|
||||
*/
|
||||
@Test
|
||||
public void testWithNullStreamStatistics() throws IOException {
|
||||
describe("Testing AbfsInputStream operations with statistics as null");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
Path nullStatFilePath = path(getMethodName());
|
||||
byte[] oneKbBuff = new byte[ONE_KB];
|
||||
|
||||
// Creating an AbfsInputStreamContext instance with null StreamStatistics.
|
||||
AbfsInputStreamContext abfsInputStreamContext =
|
||||
new AbfsInputStreamContext(
|
||||
getConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
|
||||
.withReadBufferSize(getConfiguration().getReadBufferSize())
|
||||
.withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth())
|
||||
.withStreamStatistics(null)
|
||||
.build();
|
||||
|
||||
AbfsOutputStream out = null;
|
||||
AbfsInputStream in = null;
|
||||
|
||||
try {
|
||||
out = createAbfsOutputStreamWithFlushEnabled(fs, nullStatFilePath);
|
||||
|
||||
// Writing a 1KB buffer in the file.
|
||||
out.write(oneKbBuff);
|
||||
out.hflush();
|
||||
|
||||
// AbfsRestOperation Instance required for eTag.
|
||||
AbfsRestOperation abfsRestOperation =
|
||||
fs.getAbfsClient().getPathStatus(nullStatFilePath.toUri().getPath(), false);
|
||||
|
||||
// AbfsInputStream with no StreamStatistics.
|
||||
in = new AbfsInputStream(fs.getAbfsClient(), null,
|
||||
nullStatFilePath.toUri().getPath(), ONE_KB,
|
||||
abfsInputStreamContext,
|
||||
abfsRestOperation.getResult().getResponseHeader("ETag"));
|
||||
|
||||
// Verifying that AbfsInputStream Operations works with null statistics.
|
||||
assertNotEquals("AbfsInputStream read() with null statistics should "
|
||||
+ "work", -1, in.read());
|
||||
in.seek(ONE_KB);
|
||||
|
||||
// Verifying toString() with no StreamStatistics.
|
||||
LOG.info("AbfsInputStream: {}", in.toString());
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to assert the initial values of the statistics.
|
||||
*
|
||||
* @param actualValue the actual value of the statistics.
|
||||
* @param statistic the name of operation or statistic being asserted.
|
||||
*/
|
||||
private void checkInitValue(long actualValue, String statistic) {
|
||||
assertEquals("Mismatch in " + statistic + " value", 0, actualValue);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
|
||||
|
||||
public class TestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final int OPERATIONS = 100;
|
||||
|
||||
public TestAbfsInputStreamStatistics() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check the bytesReadFromBuffer statistic value from AbfsInputStream.
|
||||
*/
|
||||
@Test
|
||||
public void testBytesReadFromBufferStatistic() {
|
||||
describe("Testing bytesReadFromBuffer statistics value in AbfsInputStream");
|
||||
|
||||
AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
|
||||
new AbfsInputStreamStatisticsImpl();
|
||||
|
||||
//Increment the bytesReadFromBuffer value.
|
||||
for (int i = 0; i < OPERATIONS; i++) {
|
||||
abfsInputStreamStatistics.bytesReadFromBuffer(1);
|
||||
}
|
||||
|
||||
/*
|
||||
* Since we incremented the bytesReadFromBuffer OPERATIONS times, this
|
||||
* should be the expected value.
|
||||
*/
|
||||
assertEquals("Mismatch in bytesReadFromBuffer value", OPERATIONS,
|
||||
abfsInputStreamStatistics.getBytesReadFromBuffer());
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue