HADOOP-18186. s3a prefetching to use SemaphoredDelegatingExecutor for submitting work (#4796)

Contributed by Viraj Jasani
This commit is contained in:
Viraj Jasani 2022-09-09 03:32:20 -07:00 committed by Steve Loughran
parent f00d77fda4
commit 1c2c6785a0
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 22 additions and 4 deletions

View File

@ -786,9 +786,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
DEFAULT_KEEPALIVE_TIME, 0);
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
int activeTasksForBoundedThreadPool = maxThreads;
int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks + numPrefetchThreads,
activeTasksForBoundedThreadPool,
waitingTasksForBoundedThreadPool,
keepAliveTime, TimeUnit.SECONDS,
name + "-bounded");
unboundedThreadPool = new ThreadPoolExecutor(
@ -800,8 +802,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
unboundedThreadPool.allowCoreThreadTimeOut(true);
executorCapacity = intOption(conf,
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
if (this.prefetchEnabled) {
this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool);
if (prefetchEnabled) {
final S3AInputStreamStatistics s3AInputStreamStatistics =
statisticsContext.newInputStreamStatistics();
futurePool = new ExecutorServiceFuturePool(
new SemaphoredDelegatingExecutor(
boundedThreadPool,
activeTasksForBoundedThreadPool + waitingTasksForBoundedThreadPool,
true,
s3AInputStreamStatistics));
}
}

View File

@ -38,8 +38,10 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
@ -130,6 +132,8 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
}
// Verify that once stream is closed, all memory is freed
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
assertThatStatisticMaximum(ioStats,
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
}
@Test
@ -159,6 +163,8 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
}
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
assertThatStatisticMaximum(ioStats,
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
}
@Test
@ -183,6 +189,9 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
// The buffer pool is not used
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
// no prefetch ops, so no action_executor_acquired
assertThatStatisticMaximum(ioStats,
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
}
}