mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-18 19:05:06 +00:00
By forking off the `SAME` pool tasks and executing them in random order, we are actually creating unrealisticc scenarios and missing the actual order of operations (whatever task that puts the task on the `SAME` queue will always run before the `SAME` queued task will be executed currently). Also, added caching for the executors. It doesn't matter much, but saves some objects and makes debugging a little easier because executor object ids make more sense.
This commit is contained in:
parent
84a2f1adf2
commit
31a84b17ad
@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager;
|
|||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPoolInfo;
|
import org.elasticsearch.threadpool.ThreadPoolInfo;
|
||||||
import org.elasticsearch.threadpool.ThreadPoolStats;
|
import org.elasticsearch.threadpool.ThreadPoolStats;
|
||||||
@ -288,6 +289,7 @@ public class DeterministicTaskQueue {
|
|||||||
* @return A <code>ThreadPool</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
|
* @return A <code>ThreadPool</code> that uses this task queue and wraps <code>Runnable</code>s in the given wrapper.
|
||||||
*/
|
*/
|
||||||
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
|
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
|
||||||
|
final ExecutorService forkingExecutor = getExecutorService(runnableWrapper);
|
||||||
return new ThreadPool(settings) {
|
return new ThreadPool(settings) {
|
||||||
|
|
||||||
private final Map<String, ThreadPool.Info> infos = new HashMap<>();
|
private final Map<String, ThreadPool.Info> infos = new HashMap<>();
|
||||||
@ -323,12 +325,12 @@ public class DeterministicTaskQueue {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ExecutorService generic() {
|
public ExecutorService generic() {
|
||||||
return getExecutorService(runnableWrapper);
|
return executor(Names.GENERIC);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ExecutorService executor(String name) {
|
public ExecutorService executor(String name) {
|
||||||
return getExecutorService(runnableWrapper);
|
return Names.SAME.equals(name) ? EsExecutors.newDirectExecutorService() : forkingExecutor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -421,6 +421,20 @@ public class DeterministicTaskQueueTests extends ESTestCase {
|
|||||||
assertThat(strings, contains("periodic-0", "periodic-1", "periodic-2"));
|
assertThat(strings, contains("periodic-0", "periodic-1", "periodic-2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSameExecutor() {
|
||||||
|
final DeterministicTaskQueue taskQueue = newTaskQueue();
|
||||||
|
final ThreadPool threadPool = taskQueue.getThreadPool();
|
||||||
|
final AtomicBoolean executed = new AtomicBoolean(false);
|
||||||
|
final AtomicBoolean executedNested = new AtomicBoolean(false);
|
||||||
|
threadPool.generic().execute(() -> {
|
||||||
|
threadPool.executor(ThreadPool.Names.SAME).execute(() -> executedNested.set(true));
|
||||||
|
assertThat(executedNested.get(), is(true));
|
||||||
|
executed.set(true);
|
||||||
|
});
|
||||||
|
taskQueue.runAllRunnableTasks();
|
||||||
|
assertThat(executed.get(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
static DeterministicTaskQueue newTaskQueue() {
|
static DeterministicTaskQueue newTaskQueue() {
|
||||||
return newTaskQueue(random());
|
return newTaskQueue(random());
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user