HADOOP-17158. Test timeout for ITestAbfsInputStreamStatistics#testReadAheadCounters (#2272)
Contributed by: Mehakmeet Singh.
This commit is contained in:
parent
c4fb4044b2
commit
84ed6adccc
|
@ -70,6 +70,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
|
|
||||||
/** Stream statistics. */
|
/** Stream statistics. */
|
||||||
private final AbfsInputStreamStatistics streamStatistics;
|
private final AbfsInputStreamStatistics streamStatistics;
|
||||||
|
private long bytesFromReadAhead; // bytes read from readAhead; for testing
|
||||||
|
private long bytesFromRemoteRead; // bytes read remotely; for testing
|
||||||
|
|
||||||
public AbfsInputStream(
|
public AbfsInputStream(
|
||||||
final AbfsClient client,
|
final AbfsClient client,
|
||||||
|
@ -235,6 +237,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
|
|
||||||
// try reading from buffers first
|
// try reading from buffers first
|
||||||
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
|
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
|
||||||
|
bytesFromReadAhead += receivedBytes;
|
||||||
if (receivedBytes > 0) {
|
if (receivedBytes > 0) {
|
||||||
incrementReadOps();
|
incrementReadOps();
|
||||||
LOG.debug("Received data from read ahead, not doing remote read");
|
LOG.debug("Received data from read ahead, not doing remote read");
|
||||||
|
@ -302,6 +305,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
throw new IOException("Unexpected Content-Length");
|
throw new IOException("Unexpected Content-Length");
|
||||||
}
|
}
|
||||||
LOG.debug("HTTP request read bytes = {}", bytesRead);
|
LOG.debug("HTTP request read bytes = {}", bytesRead);
|
||||||
|
bytesFromRemoteRead += bytesRead;
|
||||||
return (int) bytesRead;
|
return (int) bytesRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,6 +507,26 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||||
return streamStatistics;
|
return streamStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter for bytes read from readAhead buffer that fills asynchronously.
|
||||||
|
*
|
||||||
|
* @return value of the counter in long.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getBytesFromReadAhead() {
|
||||||
|
return bytesFromReadAhead;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter for bytes read remotely from the data store.
|
||||||
|
*
|
||||||
|
* @return value of the counter in long.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getBytesFromRemoteRead() {
|
||||||
|
return bytesFromRemoteRead;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the statistics of the stream.
|
* Get the statistics of the stream.
|
||||||
* @return a string value.
|
* @return a string value.
|
||||||
|
|
|
@ -41,9 +41,6 @@ public class ITestAbfsInputStreamStatistics
|
||||||
private static final int ONE_MB = 1024 * 1024;
|
private static final int ONE_MB = 1024 * 1024;
|
||||||
private static final int ONE_KB = 1024;
|
private static final int ONE_KB = 1024;
|
||||||
private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
|
private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
|
||||||
private static final int CUSTOM_READ_AHEAD_BUFFER_SIZE = 8 * CUSTOM_BLOCK_BUFFER_SIZE;
|
|
||||||
private static final int THREAD_SLEEP_10_SECONDS = 10;
|
|
||||||
private static final int TIMEOUT_30_SECONDS = 30000;
|
|
||||||
private byte[] defBuffer = new byte[ONE_MB];
|
private byte[] defBuffer = new byte[ONE_MB];
|
||||||
|
|
||||||
public ITestAbfsInputStreamStatistics() throws Exception {
|
public ITestAbfsInputStreamStatistics() throws Exception {
|
||||||
|
@ -295,8 +292,8 @@ public class ITestAbfsInputStreamStatistics
|
||||||
/**
|
/**
|
||||||
* Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
|
* Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
|
||||||
*/
|
*/
|
||||||
@Test(timeout = TIMEOUT_30_SECONDS)
|
@Test
|
||||||
public void testReadAheadCounters() throws IOException, InterruptedException {
|
public void testReadAheadCounters() throws IOException {
|
||||||
describe("Test to check correct values for readAhead counters in "
|
describe("Test to check correct values for readAhead counters in "
|
||||||
+ "AbfsInputStream");
|
+ "AbfsInputStream");
|
||||||
|
|
||||||
|
@ -334,18 +331,6 @@ public class ITestAbfsInputStreamStatistics
|
||||||
AbfsInputStreamStatisticsImpl stats =
|
AbfsInputStreamStatisticsImpl stats =
|
||||||
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
|
(AbfsInputStreamStatisticsImpl) in.getStreamStatistics();
|
||||||
|
|
||||||
/*
|
|
||||||
* Since, readAhead is done in background threads. Sometimes, the
|
|
||||||
* threads aren't finished in the background and could result in
|
|
||||||
* inaccurate results. So, we wait till we have the accurate values
|
|
||||||
* with a limit of 30 seconds as that's when the test times out.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
while (stats.getRemoteBytesRead() < CUSTOM_READ_AHEAD_BUFFER_SIZE
|
|
||||||
|| stats.getReadAheadBytesRead() < CUSTOM_BLOCK_BUFFER_SIZE) {
|
|
||||||
Thread.sleep(THREAD_SLEEP_10_SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Verifying the counter values of readAheadBytesRead and remoteBytesRead.
|
* Verifying the counter values of readAheadBytesRead and remoteBytesRead.
|
||||||
*
|
*
|
||||||
|
@ -353,27 +338,28 @@ public class ITestAbfsInputStreamStatistics
|
||||||
* from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
|
* from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
|
||||||
* we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
|
* we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
|
||||||
* buffer. Our read is till 5KB, hence readAhead would ideally read 2
|
* buffer. Our read is till 5KB, hence readAhead would ideally read 2
|
||||||
* blocks of 4KB which is equal to 8KB. But, sometimes to get more than
|
* blocks of 4KB which is equal to 8KB. But, sometimes to get blocks
|
||||||
* one block from readAhead buffer we might have to wait for background
|
* from readAhead buffer we might have to wait for background
|
||||||
* threads to fill the buffer and hence we might do remote read which
|
* threads to fill the buffer and hence we might do remote read which
|
||||||
* would be faster. Therefore, readAheadBytesRead would be equal to or
|
* would be faster. Therefore, readAheadBytesRead would be greater than
|
||||||
* greater than 4KB.
|
* or equal to the value of bytesFromReadAhead at the point we measure it.
|
||||||
*
|
*
|
||||||
* remoteBytesRead : Since, the bufferSize is set to 4KB and the number
|
* remoteBytesRead : Since, the bufferSize is set to 4KB and the number
|
||||||
* of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
|
* of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
|
||||||
* KB buffer on the first read, which is equal to 32KB. But, if we are not
|
* KB buffer on the first read, which is equal to 32KB. But, if we are not
|
||||||
* able to read some bytes that were in the buffer after doing
|
* able to read some bytes that were in the buffer after doing
|
||||||
* readAhead, we might use remote read again. Thus, the bytes read
|
* readAhead, we might use remote read again. Thus, the bytes read
|
||||||
* remotely could also be greater than 32Kb.
|
* remotely would be greater than or equal to the bytesFromRemoteRead
|
||||||
|
* value that we measure at some point of the operation.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
|
Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
|
||||||
"Mismatch in readAheadBytesRead counter value")
|
"Mismatch in readAheadBytesRead counter value")
|
||||||
.isGreaterThanOrEqualTo(CUSTOM_BLOCK_BUFFER_SIZE);
|
.isGreaterThanOrEqualTo(in.getBytesFromReadAhead());
|
||||||
|
|
||||||
Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
|
Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
|
||||||
"Mismatch in remoteBytesRead counter value")
|
"Mismatch in remoteBytesRead counter value")
|
||||||
.isGreaterThanOrEqualTo(CUSTOM_READ_AHEAD_BUFFER_SIZE);
|
.isGreaterThanOrEqualTo(in.getBytesFromRemoteRead());
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||||
|
|
Loading…
Reference in New Issue