diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index ca755f08419..5a55ca522d6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -387,6 +387,46 @@ public final class StreamStatisticNames { public static final String BLOCKS_RELEASED = "blocks_released"; + /** + * Total number of prefetching operations executed. + */ + public static final String STREAM_READ_PREFETCH_OPERATIONS + = "stream_read_prefetch_operations"; + + /** + * Total number of block in disk cache. + */ + public static final String STREAM_READ_BLOCKS_IN_FILE_CACHE + = "stream_read_blocks_in_cache"; + + /** + * Total number of active prefetch operations. + */ + public static final String STREAM_READ_ACTIVE_PREFETCH_OPERATIONS + = "stream_read_active_prefetch_operations"; + + /** + * Total bytes of memory in use by this input stream. + */ + public static final String STREAM_READ_ACTIVE_MEMORY_IN_USE + = "stream_read_active_memory_in_use"; + + /** + * count/duration of reading a remote block. + * + * Value: {@value}. + */ + public static final String STREAM_READ_REMOTE_BLOCK_READ + = "stream_read_block_read"; + + /** + * count/duration of acquiring a buffer and reading to it. + * + * Value: {@value}. + */ + public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ + = "stream_read_block_acquire_read"; + private StreamStatisticNames() { } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java index b151ed439af..bd7da11ddd8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java @@ -31,6 +31,8 @@ import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; + /** * Manages a fixed pool of {@code ByteBuffer} instances. * @@ -56,26 +58,32 @@ public class BufferPool implements Closeable { // Allows associating metadata to each buffer in the pool. private Map allocated; + private PrefetchingStatistics prefetchingStatistics; + /** * Initializes a new instance of the {@code BufferPool} class. * * @param size number of buffer in this pool. * @param bufferSize size in bytes of each buffer. + * @param prefetchingStatistics statistics for this stream. * * @throws IllegalArgumentException if size is zero or negative. * @throws IllegalArgumentException if bufferSize is zero or negative. */ - public BufferPool(int size, int bufferSize) { + public BufferPool(int size, int bufferSize, PrefetchingStatistics prefetchingStatistics) { Validate.checkPositiveInteger(size, "size"); Validate.checkPositiveInteger(bufferSize, "bufferSize"); this.size = size; this.bufferSize = bufferSize; this.allocated = new IdentityHashMap(); + this.prefetchingStatistics = requireNonNull(prefetchingStatistics); this.pool = new BoundedResourcePool(size) { @Override public ByteBuffer createNew() { - return ByteBuffer.allocate(bufferSize); + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + prefetchingStatistics.memoryAllocated(bufferSize); + return buffer; } }; } @@ -236,11 +244,15 @@ public class BufferPool implements Closeable { } } - this.pool.close(); - this.pool = null; + int currentPoolSize = pool.numCreated(); - this.allocated.clear(); - this.allocated = null; + pool.close(); + pool = null; + + allocated.clear(); + allocated = null; + + prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize); } // For debugging purposes. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 1bb439a9997..1207d3d0318 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -21,6 +21,8 @@ package org.apache.hadoop.fs.common; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; +import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -31,6 +33,10 @@ import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.statistics.DurationTracker; + +import static java.util.Objects.requireNonNull; + import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** @@ -70,33 +76,37 @@ public abstract class CachingBlockManager extends BlockManager { // Once set to true, any further caching requests will be ignored. private final AtomicBoolean cachingDisabled; + private final PrefetchingStatistics prefetchingStatistics; + /** * Constructs an instance of a {@code CachingBlockManager}. * * @param futurePool asynchronous tasks are performed in this pool. * @param blockData information about each block of the underlying file. * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. + * @param prefetchingStatistics statistics for this stream. * - * @throws IllegalArgumentException if futurePool is null. * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( ExecutorServiceFuturePool futurePool, BlockData blockData, - int bufferPoolSize) { + int bufferPoolSize, + PrefetchingStatistics prefetchingStatistics) { super(blockData); - Validate.checkNotNull(futurePool, "futurePool"); Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize"); - this.futurePool = futurePool; + this.futurePool = requireNonNull(futurePool); this.bufferPoolSize = bufferPoolSize; this.numCachingErrors = new AtomicInteger(); this.numReadErrors = new AtomicInteger(); this.cachingDisabled = new AtomicBoolean(); + this.prefetchingStatistics = requireNonNull(prefetchingStatistics); if (this.getBlockData().getFileSize() > 0) { - this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize()); + this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(), + this.prefetchingStatistics); this.cache = this.createCache(); } @@ -249,7 +259,7 @@ public abstract class CachingBlockManager extends BlockManager { } BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber); - PrefetchTask prefetchTask = new PrefetchTask(data, this); + PrefetchTask prefetchTask = new PrefetchTask(data, this, Instant.now()); Future prefetchFuture = this.futurePool.executeFunction(prefetchTask); data.setPrefetch(prefetchFuture); this.ops.end(op); @@ -279,8 +289,10 @@ public abstract class CachingBlockManager extends BlockManager { } } - private void prefetch(BufferData data) throws IOException { + private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException { synchronized (data) { + prefetchingStatistics.executorAcquired( + Duration.between(taskQueuedStartTime, Instant.now())); this.readBlock( data, true, @@ -297,6 +309,7 @@ public abstract class CachingBlockManager extends BlockManager { } BlockOperations.Operation op = null; + DurationTracker tracker = null; synchronized (data) { try { @@ -318,6 +331,7 @@ public abstract class CachingBlockManager extends BlockManager { } if (isPrefetch) { + tracker = prefetchingStatistics.prefetchOperationStarted(); op = this.ops.prefetch(data.getBlockNumber()); } else { op = this.ops.getRead(data.getBlockNumber()); @@ -333,6 +347,11 @@ public abstract class CachingBlockManager extends BlockManager { } catch (Exception e) { String message = String.format("error during readBlock(%s)", data.getBlockNumber()); LOG.error(message, e); + + if (isPrefetch && tracker != null) { + tracker.failed(); + } + this.numReadErrors.incrementAndGet(); data.setDone(); throw e; @@ -340,6 +359,13 @@ public abstract class CachingBlockManager extends BlockManager { if (op != null) { this.ops.end(op); } + + if (isPrefetch) { + prefetchingStatistics.prefetchOperationCompleted(); + if (tracker != null) { + tracker.close(); + } + } } } } @@ -350,16 +376,18 @@ public abstract class CachingBlockManager extends BlockManager { private static class PrefetchTask implements Supplier { private final BufferData data; private final CachingBlockManager blockManager; + private final Instant taskQueuedStartTime; - PrefetchTask(BufferData data, CachingBlockManager blockManager) { + PrefetchTask(BufferData data, CachingBlockManager blockManager, Instant taskQueuedStartTime) { this.data = data; this.blockManager = blockManager; + this.taskQueuedStartTime = taskQueuedStartTime; } @Override public Void get() { try { - this.blockManager.prefetch(data); + this.blockManager.prefetch(data, taskQueuedStartTime); } catch (Exception e) { LOG.error("error during prefetch", e); } @@ -420,14 +448,18 @@ public abstract class CachingBlockManager extends BlockManager { blockFuture = cf; } - CachePutTask task = new CachePutTask(data, blockFuture, this); + CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now()); Future actionFuture = this.futurePool.executeFunction(task); data.setCaching(actionFuture); this.ops.end(op); } } - private void addToCacheAndRelease(BufferData data, Future blockFuture) { + private void addToCacheAndRelease(BufferData data, Future blockFuture, + Instant taskQueuedStartTime) { + prefetchingStatistics.executorAcquired( + Duration.between(taskQueuedStartTime, Instant.now())); + if (this.closed) { return; } @@ -493,7 +525,7 @@ public abstract class CachingBlockManager extends BlockManager { } protected BlockCache createCache() { - return new SingleFilePerBlockCache(); + return new SingleFilePerBlockCache(prefetchingStatistics); } protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { @@ -513,18 +545,22 @@ public abstract class CachingBlockManager extends BlockManager { // Block manager that manages this block. private final CachingBlockManager blockManager; + private final Instant taskQueuedStartTime; + CachePutTask( BufferData data, Future blockFuture, - CachingBlockManager blockManager) { + CachingBlockManager blockManager, + Instant taskQueuedStartTime) { this.data = data; this.blockFuture = blockFuture; this.blockManager = blockManager; + this.taskQueuedStartTime = taskQueuedStartTime; } @Override public Void get() { - this.blockManager.addToCacheAndRelease(this.data, this.blockFuture); + this.blockManager.addToCacheAndRelease(this.data, this.blockFuture, taskQueuedStartTime); return null; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java new file mode 100644 index 00000000000..b1894f97696 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java @@ -0,0 +1,67 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.common; + +import java.time.Duration; + +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; + +public interface PrefetchingStatistics extends IOStatisticsSource { + + /** + * A prefetch operation has started. + * @return duration tracker + */ + DurationTracker prefetchOperationStarted(); + + /** + * A block has been saved to the file cache. + */ + void blockAddedToFileCache(); + + /** + * A block has been removed from the file cache. + */ + void blockRemovedFromFileCache(); + + /** + * A prefetch operation has completed. + */ + void prefetchOperationCompleted(); + + /** + * An executor has been acquired, either for prefetching or caching. + * @param timeInQueue time taken to acquire an executor. + */ + void executorAcquired(Duration timeInQueue); + + /** + * A new buffer has been added to the buffer pool. + * @param size size of the new buffer + */ + void memoryAllocated(int size); + + /** + * Previously allocated memory has been freed. + * @param size size of memory freed. + */ + void memoryFreed(int size); +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java index 0f3d59b6cb9..7252c294bee 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java @@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; + /** * Provides functionality necessary for caching blocks of data read from FileSystem. * Each cache block is stored on the local disk as a separate file. @@ -58,6 +60,8 @@ public class SingleFilePerBlockCache implements BlockCache { private boolean closed; + private final PrefetchingStatistics prefetchingStatistics; + // Cache entry. // Each block is stored as a separate file. private static class Entry { @@ -81,7 +85,13 @@ public class SingleFilePerBlockCache implements BlockCache { } } - public SingleFilePerBlockCache() { + /** + * Constructs an instance of a {@code SingleFilePerBlockCache}. + * + * @param prefetchingStatistics statistics for this stream. + */ + public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) { + this.prefetchingStatistics = requireNonNull(prefetchingStatistics); } /** @@ -184,6 +194,7 @@ public class SingleFilePerBlockCache implements BlockCache { } this.writeFile(blockFilePath, buffer); + this.prefetchingStatistics.blockAddedToFileCache(); long checksum = BufferData.getChecksum(buffer); Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum); this.blocks.put(blockNumber, entry); @@ -221,6 +232,7 @@ public class SingleFilePerBlockCache implements BlockCache { for (Entry entry : this.blocks.values()) { try { Files.deleteIfExists(entry.path); + this.prefetchingStatistics.blockRemovedFromFileCache(); numFilesDeleted++; } catch (IOException e) { // Ignore while closing so that we can delete as many cache files as possible. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 67734b75029..53b47e5a79c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -837,11 +837,19 @@ public class S3AInstrumentation implements Closeable, MetricsSource, StreamStatisticNames.STREAM_READ_TOTAL_BYTES, StreamStatisticNames.STREAM_READ_UNBUFFERED, StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES) - .withGauges(STREAM_READ_GAUGE_INPUT_POLICY) + .withGauges(STREAM_READ_GAUGE_INPUT_POLICY, + STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(), + STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(), + STREAM_READ_ACTIVE_MEMORY_IN_USE.getSymbol() + ) .withDurationTracking(ACTION_HTTP_GET_REQUEST, + ACTION_EXECUTOR_ACQUIRED, StoreStatisticNames.ACTION_FILE_OPENED, StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED, - StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED) + StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED, + StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, + StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ, + StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ) .build(); setIOStatistics(st); aborted = st.getCounterReference( @@ -902,6 +910,18 @@ public class S3AInstrumentation implements Closeable, MetricsSource, return incCounter(name, value); } + /** + * Increment the Statistic gauge and the local IOStatistics + * equivalent. + * @param statistic statistic + * @param v value. + * @return local IOStatistic value + */ + private long incAllGauges(Statistic statistic, long v) { + incrementGauge(statistic, v); + return incGauge(statistic.getSymbol(), v); + } + /** * {@inheritDoc}. * Increments the number of seek operations, @@ -1017,6 +1037,12 @@ public class S3AInstrumentation implements Closeable, MetricsSource, } } + @Override + public void executorAcquired(Duration timeInQueue) { + // update the duration fields in the IOStatistics. + localIOStatistics().addTimedOperation(ACTION_EXECUTOR_ACQUIRED, timeInQueue); + } + /** * {@code close()} merges the stream statistics into the filesystem's * instrumentation instance. @@ -1281,6 +1307,37 @@ public class S3AInstrumentation implements Closeable, MetricsSource, ? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED : StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED); } + + @Override + public DurationTracker prefetchOperationStarted() { + incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1); + return trackDuration(StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS); + } + + @Override + public void blockAddedToFileCache() { + incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, 1); + } + + @Override + public void blockRemovedFromFileCache() { + incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1); + } + + @Override + public void prefetchOperationCompleted() { + incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1); + } + + @Override + public void memoryAllocated(int size) { + incAllGauges(STREAM_READ_ACTIVE_MEMORY_IN_USE, size); + } + + @Override + public void memoryFreed(int size) { + incAllGauges(STREAM_READ_ACTIVE_MEMORY_IN_USE, -size); + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index dfe9fdf2d8d..66ff4f97207 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -361,6 +361,18 @@ public enum Statistic { StreamStatisticNames.STREAM_READ_TOTAL_BYTES, "Total count of bytes read from an input stream", TYPE_COUNTER), + STREAM_READ_BLOCKS_IN_FILE_CACHE( + StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, + "Gauge of blocks in disk cache", + TYPE_GAUGE), + STREAM_READ_ACTIVE_PREFETCH_OPERATIONS( + StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, + "Gauge of active prefetches", + TYPE_GAUGE), + STREAM_READ_ACTIVE_MEMORY_IN_USE( + StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, + "Gauge of active memory in use", + TYPE_GAUGE), /* Stream Write statistics */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java index 674a5ccbdd8..1c058087f31 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.CachingBlockManager; import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.Validate; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; /** * Provides access to S3 file one block at a time. @@ -48,6 +49,7 @@ public class S3CachingBlockManager extends CachingBlockManager { * @param reader reader that reads from S3 file. * @param blockData information about each block of the S3 file. * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. + * @param streamStatistics statistics for this stream. * * @throws IllegalArgumentException if reader is null. */ @@ -55,8 +57,9 @@ public class S3CachingBlockManager extends CachingBlockManager { ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, - int bufferPoolSize) { - super(futurePool, blockData, bufferPoolSize); + int bufferPoolSize, + S3AInputStreamStatistics streamStatistics) { + super(futurePool, blockData, bufferPoolSize, streamStatistics); Validate.checkNotNull(reader, "reader"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java index b6c6bf39988..b00119ac4e1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java @@ -33,6 +33,9 @@ import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; + /** * Provides an {@code InputStream} that allows reading from an S3 file. * Prefetched blocks are cached to local disk if a seek away from the @@ -120,8 +123,10 @@ public class S3CachingInputStream extends S3InputStream { @Override public void close() throws IOException { - super.close(); + // Close the BlockManager first, cancelling active prefetches, + // deleting cached files and freeing memory used by buffer pool. this.blockManager.close(); + super.close(); LOG.info("closed: {}", this.getName()); } @@ -171,7 +176,10 @@ public class S3CachingInputStream extends S3InputStream { } } - BufferData data = this.blockManager.get(toBlockNumber); + BufferData data = invokeTrackingDuration( + this.getS3AStreamStatistics().trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ), + () -> this.blockManager.get(toBlockNumber)); + this.getFilePosition().setData(data, startOffset, readPos); return true; } @@ -193,6 +201,7 @@ public class S3CachingInputStream extends S3InputStream { S3Reader reader, BlockData blockData, int bufferPoolSize) { - return new S3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize); + return new S3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize, + this.getS3AStreamStatistics()); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java index 89e3618be53..19ab4f6961d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java @@ -31,6 +31,10 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.Validate; import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; + +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; /** * Provides functionality to read S3 file one block at a time. @@ -47,6 +51,8 @@ public class S3Reader implements Closeable { // Set to true by close(). private volatile boolean closed; + private final S3AInputStreamStatistics streamStatistics; + /** * Constructs an instance of {@link S3Reader}. * @@ -58,6 +64,7 @@ public class S3Reader implements Closeable { Validate.checkNotNull(s3File, "s3File"); this.s3File = s3File; + this.streamStatistics = this.s3File.getStatistics(); } /** @@ -95,26 +102,24 @@ public class S3Reader implements Closeable { private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size) throws IOException { - this.s3File.getStatistics().readOperationStarted(offset, size); + this.streamStatistics.readOperationStarted(offset, size); Invoker invoker = this.s3File.getReadInvoker(); - int invokerResponse = invoker.retry( - "read", this.s3File.getPath(), true, - () -> { + int invokerResponse = invoker.retry("read", this.s3File.getPath(), true, + trackDurationOfOperation(streamStatistics, STREAM_READ_REMOTE_BLOCK_READ, () -> { try { this.readOneBlock(buffer, offset, size); } catch (EOFException e) { // the base implementation swallows EOFs. return -1; } catch (SocketTimeoutException e) { - this.s3File.getStatistics().readException(); throw e; } catch (IOException e) { this.s3File.getStatistics().readException(); throw e; } return 0; - }); + })); int numBytesRead = buffer.position(); buffer.limit(numBytesRead); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 539af2bde36..e74a6d59a86 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a.statistics; +import org.apache.hadoop.fs.common.PrefetchingStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; /** @@ -26,7 +27,7 @@ import org.apache.hadoop.fs.statistics.DurationTracker; * It also contains getters for tests. */ public interface S3AInputStreamStatistics extends AutoCloseable, - S3AStatisticInterface { + S3AStatisticInterface, PrefetchingStatistics { /** * Seek backwards, incrementing the seek and backward seek counters. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 5c0995e41b3..af84cfca779 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -210,6 +210,41 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext { } + @Override + public DurationTracker prefetchOperationStarted() { + return stubDurationTracker(); + } + + @Override + public void prefetchOperationCompleted() { + + } + + @Override + public void blockAddedToFileCache() { + + } + + @Override + public void blockRemovedFromFileCache() { + + } + + @Override + public void executorAcquired(Duration timeInQueue) { + + } + + @Override + public void memoryAllocated(int size) { + + } + + @Override + public void memoryFreed(int size) { + + } + /** * Return an IO statistics instance. * @return an empty IO statistics instance. @@ -343,6 +378,7 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext { public DurationTracker initiateInnerStreamClose(final boolean abort) { return stubDurationTracker(); } + } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java new file mode 100644 index 00000000000..ce13d2d9929 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java @@ -0,0 +1,76 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.common; + +import java.time.Duration; + +import org.apache.hadoop.fs.statistics.DurationTracker; + +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker; + +public final class EmptyPrefetchingStatistics implements PrefetchingStatistics { + + private static final EmptyPrefetchingStatistics EMPTY_PREFETCHING_STATISTICS = + new EmptyPrefetchingStatistics(); + + private EmptyPrefetchingStatistics() { + } + + public static EmptyPrefetchingStatistics getInstance() { + return EMPTY_PREFETCHING_STATISTICS; + } + + @Override + public DurationTracker prefetchOperationStarted() { + return stubDurationTracker(); + } + + @Override + public void blockAddedToFileCache() { + + } + + @Override + public void blockRemovedFromFileCache() { + + } + + @Override + public void prefetchOperationCompleted() { + + } + + @Override + public void executorAcquired(Duration timeInQueue) { + + } + + @Override + public void memoryAllocated(int size) { + + } + + @Override + public void memoryFreed(int size) { + + } + +} + diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java index b1344c6972c..c402673a49d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.junit.Test; +import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -38,7 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase { @Test public void testArgChecks() throws Exception { // Should not throw. - BlockCache cache = new SingleFilePerBlockCache(); + BlockCache cache = + new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance()); ByteBuffer buffer = ByteBuffer.allocate(16); @@ -47,12 +49,17 @@ public class TestBlockCache extends AbstractHadoopTestBase { IllegalArgumentException.class, "'buffer' must not be null", () -> cache.put(42, null)); + + ExceptionAsserts.assertThrows( + NullPointerException.class, + () -> new SingleFilePerBlockCache(null)); } @Test public void testPutAndGet() throws Exception { - BlockCache cache = new SingleFilePerBlockCache(); + BlockCache cache = + new SingleFilePerBlockCache(new EmptyS3AStatisticsContext().newInputStreamStatistics()); ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE); for (byte i = 0; i < BUFFER_SIZE; i++) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java index 43be295cb38..c9134f1e251 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java @@ -21,6 +21,8 @@ package org.apache.hadoop.fs.common; import org.junit.Test; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -32,33 +34,38 @@ public class TestBufferPool extends AbstractHadoopTestBase { private static final int POOL_SIZE = 2; private static final int BUFFER_SIZE = 10; - + private final S3AInputStreamStatistics s3AInputStreamStatistics = + new EmptyS3AStatisticsContext().newInputStreamStatistics(); @Test public void testArgChecks() throws Exception { // Should not throw. - BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE); + BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics); // Verify it throws correctly. ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'size' must be a positive integer", - () -> new BufferPool(0, 10)); + () -> new BufferPool(0, 10, s3AInputStreamStatistics)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'size' must be a positive integer", - () -> new BufferPool(-1, 10)); + () -> new BufferPool(-1, 10, s3AInputStreamStatistics)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'bufferSize' must be a positive integer", - () -> new BufferPool(10, 0)); + () -> new BufferPool(10, 0, s3AInputStreamStatistics)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'bufferSize' must be a positive integer", - () -> new BufferPool(1, -10)); + () -> new BufferPool(1, -10, s3AInputStreamStatistics)); + + ExceptionAsserts.assertThrows( + NullPointerException.class, + () -> new BufferPool(1, 10, null)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, @@ -78,7 +85,7 @@ public class TestBufferPool extends AbstractHadoopTestBase { @Test public void testGetAndRelease() { - BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE); + BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics); assertInitialState(pool, POOL_SIZE); int count = 0; @@ -125,7 +132,7 @@ public class TestBufferPool extends AbstractHadoopTestBase { private void testReleaseHelper(BufferData.State stateBeforeRelease, boolean expectThrow) throws Exception { - BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE); + BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics); assertInitialState(pool, POOL_SIZE); BufferData data = this.acquire(pool, 1); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java index 9b831cb3b84..f46e93e1084 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java @@ -39,6 +39,7 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** @@ -104,10 +105,11 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest { @Test public void testReadLargeFileFully() throws Throwable { describe("read a large file fully, uses S3CachingInputStream"); + IOStatistics ioStats; openFS(); try (FSDataInputStream in = largeFileFS.open(largeFile)) { - IOStatistics ioStats = in.getIOStatistics(); + ioStats = in.getIOStatistics(); byte[] buffer = new byte[S_1M * 10]; long bytesRead = 0; @@ -115,20 +117,29 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest { while (bytesRead < largeFileSize) { in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead)); bytesRead += buffer.length; + // Blocks are fully read, no blocks should be cached + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, + 0); } + // Assert that first block is read synchronously, following blocks are prefetched + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, + numBlocks - 1); verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks); verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks); } + // Verify that once stream is closed, all memory is freed + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); } @Test public void testRandomReadLargeFile() throws Throwable { describe("random read on a large file, uses S3CachingInputStream"); + IOStatistics ioStats; openFS(); try (FSDataInputStream in = largeFileFS.open(largeFile)) { - IOStatistics ioStats = in.getIOStatistics(); + ioStats = in.getIOStatistics(); byte[] buffer = new byte[blockSize]; @@ -141,7 +152,13 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest { verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2); verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1); + // block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched + // when we seek out of block 0, see cancelPrefetches() + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2); } + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0); + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); } @Test @@ -163,6 +180,9 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest { verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1); verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1); + verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0); + // The buffer pool is not used + verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java index 7e91b6830d5..d2a045e335e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java @@ -49,7 +49,6 @@ import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; -import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; @@ -284,6 +283,7 @@ public final class Fakes { private final int writeDelay; public TestS3FilePerBlockCache(int readDelay, int writeDelay) { + super(new EmptyS3AStatisticsContext().newInputStreamStatistics()); this.files = new ConcurrentHashMap<>(); this.readDelay = readDelay; this.writeDelay = writeDelay; @@ -337,7 +337,8 @@ public final class Fakes { S3Reader reader, BlockData blockData, int bufferPoolSize) { - super(futurePool, reader, blockData, bufferPoolSize); + super(futurePool, reader, blockData, bufferPoolSize, + new EmptyS3AStatisticsContext().newInputStreamStatistics()); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java index 3f84e2e0283..a9ebae276f3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BufferData; import org.apache.hadoop.fs.common.ExceptionAsserts; import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -41,6 +43,8 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { private final ExecutorService threadPool = Executors.newFixedThreadPool(4); private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final S3AInputStreamStatistics streamStatistics = + new EmptyS3AStatisticsContext().newInputStreamStatistics(); private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE); @@ -51,33 +55,35 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { // Should not throw. S3CachingBlockManager blockManager = - new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE); + new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics); // Verify it throws correctly. ExceptionAsserts.assertThrows( - IllegalArgumentException.class, - "'futurePool' must not be null", - () -> new S3CachingBlockManager(null, reader, blockData, POOL_SIZE)); + NullPointerException.class, + () -> new S3CachingBlockManager(null, reader, blockData, POOL_SIZE, streamStatistics)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'reader' must not be null", - () -> new S3CachingBlockManager(futurePool, null, blockData, POOL_SIZE)); + () -> new S3CachingBlockManager(futurePool, null, blockData, POOL_SIZE, streamStatistics)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'blockData' must not be null", - () -> new S3CachingBlockManager(futurePool, reader, null, POOL_SIZE)); + () -> new S3CachingBlockManager(futurePool, reader, null, POOL_SIZE, streamStatistics)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer", - () -> new S3CachingBlockManager(futurePool, reader, blockData, 0)); + () -> new S3CachingBlockManager(futurePool, reader, blockData, 0, streamStatistics)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer", - () -> new S3CachingBlockManager(futurePool, reader, blockData, -1)); + () -> new S3CachingBlockManager(futurePool, reader, blockData, -1, streamStatistics)); + + ExceptionAsserts.assertThrows(NullPointerException.class, + () -> new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, null)); ExceptionAsserts.assertThrows( IllegalArgumentException.class, @@ -108,8 +114,9 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, - int bufferPoolSize) { - super(futurePool, reader, blockData, bufferPoolSize); + int bufferPoolSize, + S3AInputStreamStatistics streamStatistics) { + super(futurePool, reader, blockData, bufferPoolSize, streamStatistics); } // If true, forces the next read operation to fail. @@ -157,7 +164,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { MockS3File s3File = new MockS3File(FILE_SIZE, true); S3Reader reader = new S3Reader(s3File); TestBlockManager blockManager = - new TestBlockManager(futurePool, reader, blockData, POOL_SIZE); + new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics); for (int b = 0; b < blockData.getNumBlocks(); b++) { // We simulate caching failure for all even numbered blocks. @@ -204,7 +211,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { MockS3File s3File = new MockS3File(FILE_SIZE, false); S3Reader reader = new S3Reader(s3File); TestBlockManager blockManager = - new TestBlockManager(futurePool, reader, blockData, POOL_SIZE); + new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics); assertInitialState(blockManager); int expectedNumErrors = 0; @@ -236,7 +243,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { MockS3File s3File = new MockS3File(FILE_SIZE, false); S3Reader reader = new S3Reader(s3File); S3CachingBlockManager blockManager = - new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE); + new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics); assertInitialState(blockManager); for (int b = 0; b < blockData.getNumBlocks(); b++) { @@ -267,7 +274,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { MockS3File s3File = new MockS3File(FILE_SIZE, false); S3Reader reader = new S3Reader(s3File); TestBlockManager blockManager = - new TestBlockManager(futurePool, reader, blockData, POOL_SIZE); + new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics); assertInitialState(blockManager); int expectedNumErrors = 0;