HADOOP-18186. s3a prefetching to use SemaphoredDelegatingExecutor for submitting work (#4796)
Contributed by Viraj Jasani
This commit is contained in:
parent
4a01fadb94
commit
56387cce57
|
@ -766,9 +766,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
DEFAULT_KEEPALIVE_TIME, 0);
|
DEFAULT_KEEPALIVE_TIME, 0);
|
||||||
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
|
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
|
||||||
|
|
||||||
|
int activeTasksForBoundedThreadPool = maxThreads;
|
||||||
|
int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
|
||||||
boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
|
boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
|
||||||
maxThreads,
|
activeTasksForBoundedThreadPool,
|
||||||
maxThreads + totalTasks + numPrefetchThreads,
|
waitingTasksForBoundedThreadPool,
|
||||||
keepAliveTime, TimeUnit.SECONDS,
|
keepAliveTime, TimeUnit.SECONDS,
|
||||||
name + "-bounded");
|
name + "-bounded");
|
||||||
unboundedThreadPool = new ThreadPoolExecutor(
|
unboundedThreadPool = new ThreadPoolExecutor(
|
||||||
|
@ -780,8 +782,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
unboundedThreadPool.allowCoreThreadTimeOut(true);
|
unboundedThreadPool.allowCoreThreadTimeOut(true);
|
||||||
executorCapacity = intOption(conf,
|
executorCapacity = intOption(conf,
|
||||||
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
|
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
|
||||||
if (this.prefetchEnabled) {
|
if (prefetchEnabled) {
|
||||||
this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool);
|
final S3AInputStreamStatistics s3AInputStreamStatistics =
|
||||||
|
statisticsContext.newInputStreamStatistics();
|
||||||
|
futurePool = new ExecutorServiceFuturePool(
|
||||||
|
new SemaphoredDelegatingExecutor(
|
||||||
|
boundedThreadPool,
|
||||||
|
activeTasksForBoundedThreadPool + waitingTasksForBoundedThreadPool,
|
||||||
|
true,
|
||||||
|
s3AInputStreamStatistics));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,8 +38,10 @@ import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
|
||||||
|
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
|
||||||
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,6 +132,8 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
|
||||||
}
|
}
|
||||||
// Verify that once stream is closed, all memory is freed
|
// Verify that once stream is closed, all memory is freed
|
||||||
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
||||||
|
assertThatStatisticMaximum(ioStats,
|
||||||
|
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -159,6 +163,8 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
|
||||||
}
|
}
|
||||||
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
|
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
|
||||||
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
||||||
|
assertThatStatisticMaximum(ioStats,
|
||||||
|
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -183,6 +189,9 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
|
||||||
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
|
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
|
||||||
// The buffer pool is not used
|
// The buffer pool is not used
|
||||||
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
|
||||||
|
// no prefetch ops, so no action_executor_acquired
|
||||||
|
assertThatStatisticMaximum(ioStats,
|
||||||
|
StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue