Replace Processing ExecutorService with QueryProcessingPool (#11382)

This PR refactors the code for QueryRunnerFactory#mergeRunners to accept a new interface called QueryProcessingPool instead of ExecutorService for concurrent execution of query runners. This interface will let custom extensions inject their own implementation for deciding which query-runner to prioritize first. The default implementation is the same as today that takes the priority of query into account. QueryProcessingPool can also be used as a regular executor service. It has a dedicated method for accepting query execution work so implementations can differentiate between regular async tasks and query execution tasks. This dedicated method also passes the QueryRunner object as part of the task information. This hook will let custom extensions carry any state from QuerySegmentWalker to QueryProcessingPool#mergeRunners which is not possible currently.
This commit is contained in:
Abhishek Agarwal 2021-07-01 16:03:08 +05:30 committed by GitHub
parent 906a704c55
commit 03a6a6d6e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
60 changed files with 605 additions and 265 deletions

View File

@ -104,6 +104,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
@ -2883,7 +2884,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesAndScanConglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),

View File

@ -89,6 +89,7 @@ import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.SelectorDimFilter;
@ -2970,7 +2971,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
this::makeTimeseriesOnlyConglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),

View File

@ -46,6 +46,7 @@ import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
@ -73,7 +74,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* Stuff that may be needed by a Task in order to conduct its business.
@ -99,7 +99,7 @@ public class TaskToolbox
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
@Nullable
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final SegmentLoader segmentLoader;
private final ObjectMapper jsonMapper;
@ -141,7 +141,7 @@ public class TaskToolbox
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
@Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoader segmentLoader,
@ -180,7 +180,7 @@ public class TaskToolbox
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoader = segmentLoader;
@ -268,9 +268,9 @@ public class TaskToolbox
return queryRunnerFactoryConglomerateProvider.get();
}
public ExecutorService getQueryExecutorService()
public QueryProcessingPool getQueryProcessingPool()
{
return queryExecutorService;
return queryProcessingPool;
}
public JoinableFactory getJoinableFactory()

View File

@ -33,7 +33,6 @@ import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskConfig;
@ -44,6 +43,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
@ -62,7 +62,6 @@ import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.security.AuthorizerMapper;
import java.io.File;
import java.util.concurrent.ExecutorService;
/**
* Stuff that may be needed by a Task in order to conduct its business.
@ -81,7 +80,7 @@ public class TaskToolboxFactory
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final SegmentLoaderFactory segmentLoaderFactory;
@ -122,7 +121,7 @@ public class TaskToolboxFactory
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
@Processing ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoaderFactory segmentLoaderFactory,
@ -160,7 +159,7 @@ public class TaskToolboxFactory
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoaderFactory = segmentLoaderFactory;
@ -202,7 +201,7 @@ public class TaskToolboxFactory
serverAnnouncer,
handoffNotifierFactory,
queryRunnerFactoryConglomerateProvider,
queryExecutorService,
queryProcessingPool,
joinableFactory,
monitorSchedulerProvider,
segmentLoaderFactory.manufacturate(taskWorkDir),

View File

@ -776,7 +776,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getQueryProcessingPool(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),

View File

@ -344,7 +344,7 @@ public class RealtimeIndexTask extends AbstractTask
lockingSegmentAnnouncer,
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
toolbox.getQueryProcessingPool(),
toolbox.getJoinableFactory(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO(),

View File

@ -192,7 +192,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getQueryProcessingPool(),
toolbox.getJoinableFactory(),
toolbox.getCache(),
toolbox.getCacheConfig(),

View File

@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
@ -62,7 +63,6 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class TaskToolboxTest
{
@ -81,7 +81,7 @@ public class TaskToolboxTest
private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate
= EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class);
private QueryProcessingPool mockQueryProcessingPool = EasyMock.createMock(QueryProcessingPool.class);
private ObjectMapper ObjectMapper = new ObjectMapper();
private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
@ -126,7 +126,7 @@ public class TaskToolboxTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
mockHandoffNotifierFactory,
() -> mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockQueryProcessingPool,
NoopJoinableFactory.INSTANCE,
() -> mockMonitorScheduler,
mockSegmentLoaderFactory,
@ -172,9 +172,9 @@ public class TaskToolboxTest
}
@Test
public void testGetQueryExecutorService()
public void testGetQueryProcessingPool()
{
Assert.assertEquals(mockQueryExecutorService, taskToolbox.build(task).getQueryExecutorService());
Assert.assertEquals(mockQueryProcessingPool, taskToolbox.build(task).getQueryProcessingPool());
}
@Test

View File

@ -93,6 +93,7 @@ import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@ -1586,7 +1587,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE, // queryExecutorService
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),

View File

@ -79,6 +79,7 @@ import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
@ -985,7 +986,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),

View File

@ -25,6 +25,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
@ -43,8 +44,6 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
import java.util.concurrent.ExecutorService;
public class TestAppenderatorsManager implements AppenderatorsManager
{
private Appenderator realtimeAppenderator;
@ -62,7 +61,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
@ -83,7 +82,7 @@ public class TestAppenderatorsManager implements AppenderatorsManager
conglomerate,
segmentAnnouncer,
emitter,
queryExecutorService,
queryProcessingPool,
joinableFactory,
cache,
cacheConfig,

View File

@ -94,7 +94,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
@ -105,6 +104,8 @@ import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -156,6 +157,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
@ -660,7 +662,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
Execs.directExecutor(), // query executor service
DirectQueryProcessingPool.INSTANCE, // query executor service
NoopJoinableFactory.INSTANCE,
() -> monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(null, new DefaultObjectMapper()),
@ -1336,7 +1338,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
final ExecutorService exec = Executors.newFixedThreadPool(8);
UnifiedIndexerAppenderatorsManager unifiedIndexerAppenderatorsManager = new UnifiedIndexerAppenderatorsManager(
exec,
new ForwardingQueryProcessingPool(exec),
NoopJoinableFactory.INSTANCE,
new WorkerConfig(),
MapCache.create(2048),

View File

@ -25,7 +25,6 @@ import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
@ -35,6 +34,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@ -55,7 +55,6 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -91,7 +90,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager
public ServerManagerForQueryErrorTest(
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec,
QueryProcessingPool queryProcessingPool,
CachePopulator cachePopulator,
@Smile ObjectMapper objectMapper,
Cache cache,
@ -104,7 +103,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager
super(
conglomerate,
emitter,
exec,
queryProcessingPool,
cachePopulator,
objectMapper,
cache,
@ -116,7 +115,7 @@ public class ServerManagerForQueryErrorTest extends ServerManager
}
@Override
<T> QueryRunner<T> buildQueryRunnerForSegment(
protected <T> QueryRunner<T> buildQueryRunnerForSegment(
Query<T> query,
SegmentDescriptor descriptor,
QueryRunnerFactory<T, Query<T>> factory,

View File

@ -17,20 +17,33 @@
* under the License.
*/
package org.apache.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
package org.apache.druid.query;
/**
* A helper class to avoid boilerplate for creating {@link PrioritizedQueryRunnerCallable} objects.
* @param <T>
* @param <V>
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Processing
public abstract class AbstractPrioritizedQueryRunnerCallable<T, V> implements PrioritizedQueryRunnerCallable<T, V>
{
private final QueryRunner<V> runner;
private final int priority;
public AbstractPrioritizedQueryRunnerCallable(int priority, QueryRunner<V> runner)
{
this.priority = priority;
this.runner = runner;
}
@Override
public QueryRunner<V> getRunner()
{
return runner;
}
@Override
public int getPriority()
{
return priority;
}
}

View File

@ -27,7 +27,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* Handles caching-related tasks for a particular query type.
@ -42,7 +41,7 @@ public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results
*
* @return true if the query is cacheable, otherwise false.

View File

@ -25,8 +25,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -36,12 +34,10 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.context.ResponseContext;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -62,28 +58,18 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(ChainedExecutionQueryRunner.class);
private final QueryProcessingPool queryProcessingPool;
private final Iterable<QueryRunner<T>> queryables;
private final ListeningExecutorService exec;
private final QueryWatcher queryWatcher;
public ChainedExecutionQueryRunner(
ExecutorService exec,
QueryWatcher queryWatcher,
QueryRunner<T>... queryables
)
{
this(exec, queryWatcher, Arrays.asList(queryables));
}
public ChainedExecutionQueryRunner(
ExecutorService exec,
QueryProcessingPool queryProcessingPool,
QueryWatcher queryWatcher,
Iterable<QueryRunner<T>> queryables
)
{
// listeningDecorator will leave PrioritizedExecutorService unchanged,
// since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec);
this.queryProcessingPool = queryProcessingPool;
this.queryables = Iterables.unmodifiableIterable(queryables);
this.queryWatcher = queryWatcher;
}
@ -111,8 +97,8 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
}
return exec.submit(
new AbstractPrioritizedCallable<Iterable<T>>(priority)
return queryProcessingPool.submitRunnerTask(
new AbstractPrioritizedQueryRunnerCallable<Iterable<T>, T>(priority, input)
{
@Override
public Iterable<T> call()
@ -142,8 +128,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
throw new RuntimeException(e);
}
}
}
);
});
}
)
);

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.common.concurrent.Execs;
/**
* {@link QueryProcessingPool} wrapper over {@link Execs#directExecutor()}
*/
public class DirectQueryProcessingPool extends ForwardingListeningExecutorService implements QueryProcessingPool
{
public static DirectQueryProcessingPool INSTANCE = new DirectQueryProcessingPool();
private DirectQueryProcessingPool()
{
}
@Override
public <T, V> ListenableFuture<T> submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task)
{
return delegate().submit(task);
}
@Override
public ListeningExecutorService delegate()
{
return Execs.directExecutor();
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
/**
* Default implementation of {@link QueryProcessingPool} that just forwards operations, including query execution tasks,
* to an underlying {@link ExecutorService}
*/
public class ForwardingQueryProcessingPool extends ForwardingListeningExecutorService implements QueryProcessingPool
{
private final ListeningExecutorService delegate;
public ForwardingQueryProcessingPool(ExecutorService executorService)
{
this.delegate = MoreExecutors.listeningDecorator(executorService);
}
@Override
public <T, V> ListenableFuture<T> submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task)
{
return delegate().submit(task);
}
@Override
protected ListeningExecutorService delegate()
{
return delegate;
}
}

View File

@ -28,8 +28,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.data.input.Row;
@ -51,7 +49,6 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -60,20 +57,20 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(GroupByMergedQueryRunner.class);
private final Iterable<QueryRunner<T>> queryables;
private final ListeningExecutorService exec;
private final Supplier<GroupByQueryConfig> configSupplier;
private final QueryWatcher queryWatcher;
private final NonBlockingPool<ByteBuffer> bufferPool;
private final QueryProcessingPool queryProcessingPool;
public GroupByMergedQueryRunner(
ExecutorService exec,
QueryProcessingPool queryProcessingPool,
Supplier<GroupByQueryConfig> configSupplier,
QueryWatcher queryWatcher,
NonBlockingPool<ByteBuffer> bufferPool,
Iterable<QueryRunner<T>> queryables
)
{
this.exec = MoreExecutors.listeningDecorator(exec);
this.queryProcessingPool = queryProcessingPool;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.configSupplier = configSupplier;
@ -109,8 +106,8 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
}
ListenableFuture<Void> future = exec.submit(
new AbstractPrioritizedCallable<Void>(priority)
ListenableFuture<Void> future = queryProcessingPool.submitRunnerTask(
new AbstractPrioritizedQueryRunnerCallable<Void, T>(priority, input)
{
@Override
public Void call()
@ -118,10 +115,10 @@ public class GroupByMergedQueryRunner<T> implements QueryRunner<T>
try {
if (bySegment) {
input.run(threadSafeQueryPlus, responseContext)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
input.run(threadSafeQueryPlus, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
return null;

View File

@ -19,53 +19,28 @@
package org.apache.druid.query;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.util.concurrent.Callable;
public class MetricsEmittingExecutorService extends ForwardingListeningExecutorService
public class MetricsEmittingQueryProcessingPool extends ForwardingQueryProcessingPool
implements ExecutorServiceMonitor.MetricEmitter
{
private final ListeningExecutorService delegate;
public MetricsEmittingExecutorService(
public MetricsEmittingQueryProcessingPool(
ListeningExecutorService delegate,
ExecutorServiceMonitor executorServiceMonitor
)
{
super();
this.delegate = delegate;
super(delegate);
executorServiceMonitor.add(this);
}
@Override
protected ListeningExecutorService delegate()
{
return delegate;
}
@SuppressWarnings("ParameterPackage")
@Override
public <T> ListenableFuture<T> submit(Callable<T> tCallable)
{
return delegate.submit(tCallable);
}
@Override
public void execute(Runnable runnable)
{
delegate.execute(runnable);
}
@Override
public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder)
{
if (delegate instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate).getQueueSize()));
if (delegate() instanceof PrioritizedExecutorService) {
emitter.emit(metricBuilder.build("segment/scan/pending", ((PrioritizedExecutorService) delegate()).getQueueSize()));
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
/**
* An implementation of {@link PrioritizedCallable} that also lets caller get access to associated {@link QueryRunner}
* It is used in implementations of {@link QueryRunnerFactory}
* @param <T> - Type of result of {@link #call()} method
* @param <V> - Type of {@link org.apache.druid.java.util.common.guava.Sequence} of rows returned by {@link QueryRunner}
*/
public interface PrioritizedQueryRunnerCallable<T, V> extends PrioritizedCallable<T>
{
/**
* This method can be used by the extensions to get the runner that the given query execution task corresponds to.
* That in turn can be used to fetch any state associated with the QueryRunner such as the segment info for example.
* Extensions can carry any state from custom implementation of QuerySegmentWalker to a
* custom implementation of {@link QueryProcessingPool#submitRunnerTask(PrioritizedQueryRunnerCallable)}
*/
QueryRunner<V> getRunner();
}

View File

@ -48,7 +48,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ExtensionPoint
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "queryType")
@ -109,7 +108,7 @@ public interface Query<T>
/**
* Comparator that represents the order in which results are generated from the
* {@link QueryRunnerFactory#createRunner(Segment)} and
* {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} calls. This is used to combine streams of
* {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} calls. This is used to combine streams of
* results from different sources; for example, it's used by historicals to combine streams from different segments,
* and it's used by the broker to combine streams from different historicals.
*

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.guice.annotations.ExtensionPoint;
/**
* This class implements the logic of how units of query execution run concurrently. It is used in {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}.
* In a most straightforward implementation, each unit will be submitted to an {@link PrioritizedExecutorService}. Extensions,
* however, can implement their own logic for picking which unit to pick first for execution.
* <p>
* This interface extends {@link ListeningExecutorService} as well. It has a separate
* method to submit query execution tasks so that implementations can differentiate those tasks from any regular async
* tasks. One example is {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(QueryProcessingPool, Iterable)}
* where different kind of tasks are submitted to same processing pool.
* <p>
* Query execution task also includes a reference to {@link QueryRunner} so that any state required to decide the priority
* of a unit can be carried forward with the corresponding {@link QueryRunner}.
*/
@ExtensionPoint
public interface QueryProcessingPool extends ListeningExecutorService
{
/**
* Submits the query execution unit task for asynchronous execution.
*
* @param task - Task to be submitted.
* @param <T> - Task result type
* @param <V> - Query runner sequence type
* @return - Future object for tracking the task completion.
*/
<T, V> ListenableFuture<T> submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task);
}

View File

@ -44,6 +44,7 @@ public interface QueryRunnerFactory<T, QueryType extends Query<T>>
QueryRunner<T> createRunner(Segment segment);
/**
* @deprecated Use {@link #mergeRunners(QueryProcessingPool, Iterable)} instead.
* Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed
* along to this method with an {@link ExecutorService}. The method should then return a {@link QueryRunner} that,
* when asked, will use the {@link ExecutorService} to run the base QueryRunners in some fashion.
@ -56,10 +57,38 @@ public interface QueryRunnerFactory<T, QueryType extends Query<T>>
*
* @param queryExecutor {@link ExecutorService} to be used for parallel processing
* @param queryRunners Individual {@link QueryRunner} objects that produce some results
* @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
* @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
* {@link QueryRunner} collection.
*/
QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners);
@Deprecated
default QueryRunner<T> mergeRunners(ExecutorService queryExecutor, Iterable<QueryRunner<T>> queryRunners)
{
return mergeRunners(new ForwardingQueryProcessingPool(queryExecutor), queryRunners);
}
/**
* Runners generated with createRunner() and combined into an Iterable in (time,shardId) order are passed
* along to this method with an {@link QueryProcessingPool}. The method should then return a {@link QueryRunner} that,
* when asked, will use the {@link QueryProcessingPool} to run the base QueryRunners in some fashion.
*
* The vast majority of the time, this should be implemented with {@link ChainedExecutionQueryRunner}:
*
* return new ChainedExecutionQueryRunner<>(queryProcessingPool, toolChest.getOrdering(), queryWatcher, queryRunners);
*
* Which will allow for parallel execution up to the maximum number of processing threads allowed.
*
* Unlike {@link #mergeRunners(ExecutorService, Iterable)}, this method takes a {@link QueryProcessingPool} instead
* which allows custom implementations for prioritize query execution on segments.
*
* @param queryProcessingPool {@link QueryProcessingPool} to be used for parallel processing
* @param queryRunners Individual {@link QueryRunner} objects that produce some results
* @return a {@link QueryRunner} that, when asked, will use the {@link ExecutorService} to run the base
* {@link QueryRunner} collection.
*/
QueryRunner<T> mergeRunners(
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<T>> queryRunners
);
/**
* Provides access to the {@link QueryToolChest} for this specific {@link Query} type.

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
@ -36,7 +37,6 @@ import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
/**
*/
@ -64,11 +64,11 @@ public class DataSourceMetadataQueryRunnerFactory
@Override
public QueryRunner<Result<DataSourceMetadataResultValue>> mergeRunners(
ExecutorService queryExecutor,
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<Result<DataSourceMetadataResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
}
@Override

View File

@ -20,13 +20,12 @@
package org.apache.druid.query.groupby;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
@ -35,8 +34,6 @@ import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import java.util.concurrent.ExecutorService;
/**
*
*/
@ -63,13 +60,10 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<ResultRow,
@Override
public QueryRunner<ResultRow> mergeRunners(
final ExecutorService exec,
final QueryProcessingPool queryProcessingPool,
final Iterable<QueryRunner<ResultRow>> queryRunners
)
{
// mergeRunners should take ListeningExecutorService at some point
final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
return new QueryRunner<ResultRow>()
{
@Override
@ -77,7 +71,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory<ResultRow,
{
QueryRunner<ResultRow> rowQueryRunner = strategySelector
.strategize((GroupByQuery) queryPlus.getQuery())
.mergeRunners(queryExecutor, queryRunners);
.mergeRunners(queryProcessingPool, queryRunners);
return rowQueryRunner.run(queryPlus, responseContext);
}
};

View File

@ -29,8 +29,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.Releaser;
@ -43,11 +41,12 @@ import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.AbstractPrioritizedCallable;
import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryWatcher;
@ -64,7 +63,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -80,7 +78,7 @@ import java.util.concurrent.TimeoutException;
* similarities and differences.
*
* Used by
* {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(ListeningExecutorService, Iterable)}.
* {@link org.apache.druid.query.groupby.strategy.GroupByStrategyV2#mergeRunners(QueryProcessingPool, Iterable)}
*/
public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
{
@ -89,7 +87,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
private final GroupByQueryConfig config;
private final Iterable<QueryRunner<ResultRow>> queryables;
private final ListeningExecutorService exec;
private final QueryProcessingPool queryProcessingPool;
private final QueryWatcher queryWatcher;
private final int concurrencyHint;
private final BlockingPool<ByteBuffer> mergeBufferPool;
@ -99,7 +97,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
public GroupByMergingQueryRunnerV2(
GroupByQueryConfig config,
ExecutorService exec,
QueryProcessingPool queryProcessingPool,
QueryWatcher queryWatcher,
Iterable<QueryRunner<ResultRow>> queryables,
int concurrencyHint,
@ -110,7 +108,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
)
{
this.config = config;
this.exec = MoreExecutors.listeningDecorator(exec);
this.queryProcessingPool = queryProcessingPool;
this.queryWatcher = queryWatcher;
this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
this.concurrencyHint = concurrencyHint;
@ -142,7 +140,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
.withoutThreadUnsafeState();
if (QueryContexts.isBySegment(query) || forceChainedExecution) {
ChainedExecutionQueryRunner<ResultRow> runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables);
ChainedExecutionQueryRunner<ResultRow> runner = new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryables);
return runner.run(queryPlusForRunners, responseContext);
}
@ -203,7 +201,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
concurrencyHint,
temporaryStorage,
spillMapper,
exec,
queryProcessingPool, // Passed as executor service
priority,
hasTimeout,
timeoutAt,
@ -229,8 +227,8 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
}
ListenableFuture<AggregateResult> future = exec.submit(
new AbstractPrioritizedCallable<AggregateResult>(priority)
ListenableFuture<AggregateResult> future = queryProcessingPool.submitRunnerTask(
new AbstractPrioritizedQueryRunnerCallable<AggregateResult, ResultRow>(priority, input)
{
@Override
public AggregateResult call()
@ -244,7 +242,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<ResultRow>
) {
// Return true if OK, false if resources were exhausted.
return input.run(queryPlusForRunners, responseContext)
.accumulate(AggregateResult.ok(), accumulator);
.accumulate(AggregateResult.ok(), accumulator);
}
catch (QueryInterruptedException | QueryTimeoutException e) {
throw e;

View File

@ -19,10 +19,10 @@
package org.apache.druid.query.groupby.strategy;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.context.ResponseContext;
@ -33,8 +33,8 @@ import org.apache.druid.query.groupby.resource.GroupByQueryResource;
import org.apache.druid.segment.StorageAdapter;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.concurrent.ExecutorService;
import java.util.function.BinaryOperator;
public interface GroupByStrategy
@ -56,7 +56,7 @@ public interface GroupByStrategy
*
* Used by {@link GroupByQueryQueryToolChest#getCacheStrategy(GroupByQuery)}.
*
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be
* called on the cached by-segment results. Can be used to distinguish if we are running on
* a broker or data node.
*
@ -163,18 +163,17 @@ public interface GroupByStrategy
/**
* Merge a variety of single-segment query runners into a combined runner. Used by
* {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(ExecutorService, Iterable)}. In
* {@link org.apache.druid.query.groupby.GroupByQueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}. In
* that sense, it is intended to go along with {@link #process(GroupByQuery, StorageAdapter)} (the runners created
* by that method will be fed into this method).
*
* <p>
* This method is only called on data servers, like Historicals (not the Broker).
*
* @param exec executor service used for parallel execution of the query runners
* @param queryRunners collection of query runners to merge
*
* @param queryProcessingPool {@link QueryProcessingPool} service used for parallel execution of the query runners
* @param queryRunners collection of query runners to merge
* @return merged query runner
*/
QueryRunner<ResultRow> mergeRunners(ListeningExecutorService exec, Iterable<QueryRunner<ResultRow>> queryRunners);
QueryRunner<ResultRow> mergeRunners(QueryProcessingPool queryProcessingPool, Iterable<QueryRunner<ResultRow>> queryRunners);
/**
* Process a groupBy query on a single {@link StorageAdapter}. This is used by

View File

@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.guice.annotations.Global;
@ -34,6 +33,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.GroupByMergedQueryRunner;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -270,11 +270,11 @@ public class GroupByStrategyV1 implements GroupByStrategy
@Override
public QueryRunner<ResultRow> mergeRunners(
final ListeningExecutorService exec,
final QueryProcessingPool queryProcessingPool,
final Iterable<QueryRunner<ResultRow>> queryRunners
)
{
return new GroupByMergedQueryRunner<>(exec, configSupplier, queryWatcher, bufferPool, queryRunners);
return new GroupByMergedQueryRunner<>(queryProcessingPool, configSupplier, queryWatcher, bufferPool, queryRunners);
}
@Override

View File

@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.NonBlockingPool;
@ -47,6 +46,7 @@ import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
@ -557,13 +557,13 @@ public class GroupByStrategyV2 implements GroupByStrategy
@Override
public QueryRunner<ResultRow> mergeRunners(
final ListeningExecutorService exec,
final QueryProcessingPool queryProcessingPool,
final Iterable<QueryRunner<ResultRow>> queryRunners
)
{
return new GroupByMergingQueryRunnerV2(
configSupplier.get(),
exec,
queryProcessingPool,
queryWatcher,
queryRunners,
processingConfig.getNumThreads(),

View File

@ -21,8 +21,6 @@ package org.apache.druid.query.metadata;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.StringUtils;
@ -30,12 +28,13 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.AbstractPrioritizedCallable;
import org.apache.druid.query.AbstractPrioritizedQueryRunnerCallable;
import org.apache.druid.query.ConcatQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryTimeoutException;
@ -58,7 +57,6 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -186,11 +184,10 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
@Override
public QueryRunner<SegmentAnalysis> mergeRunners(
ExecutorService exec,
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<SegmentAnalysis>> queryRunners
)
{
final ListeningExecutorService queryExecutor = MoreExecutors.listeningDecorator(exec);
return new ConcatQueryRunner<SegmentAnalysis>(
Sequences.map(
Sequences.simple(queryRunners),
@ -210,8 +207,8 @@ public class SegmentMetadataQueryRunnerFactory implements QueryRunnerFactory<Seg
final Query<SegmentAnalysis> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final QueryPlus<SegmentAnalysis> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<Sequence<SegmentAnalysis>> future = queryExecutor.submit(
new AbstractPrioritizedCallable<Sequence<SegmentAnalysis>>(priority)
ListenableFuture<Sequence<SegmentAnalysis>> future = queryProcessingPool.submitRunnerTask(
new AbstractPrioritizedQueryRunnerCallable<Sequence<SegmentAnalysis>, SegmentAnalysis>(priority, input)
{
@Override
public Sequence<SegmentAnalysis> call()

View File

@ -35,6 +35,7 @@ import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
@ -54,7 +55,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValue, ScanQuery>
@ -83,7 +83,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
@Override
public QueryRunner<ScanResultValue> mergeRunners(
final ExecutorService queryExecutor,
final QueryProcessingPool queryProcessingPool,
final Iterable<QueryRunner<ScanResultValue>> queryRunners
)
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.query.search;
import com.google.inject.Inject;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
@ -28,8 +29,6 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result;
import org.apache.druid.segment.Segment;
import java.util.concurrent.ExecutorService;
/**
*/
public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<SearchResultValue>, SearchQuery>
@ -58,11 +57,11 @@ public class SearchQueryRunnerFactory implements QueryRunnerFactory<Result<Searc
@Override
public QueryRunner<Result<SearchResultValue>> mergeRunners(
ExecutorService queryExecutor,
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<Result<SearchResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
}
@Override

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerHelper;
@ -47,7 +48,6 @@ import org.joda.time.DateTime;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
*/
@ -71,11 +71,11 @@ public class TimeBoundaryQueryRunnerFactory
@Override
public QueryRunner<Result<TimeBoundaryResultValue>> mergeRunners(
ExecutorService queryExecutor,
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<Result<TimeBoundaryResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
}
@Override

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
@ -34,8 +35,6 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.StorageAdapter;
import java.util.concurrent.ExecutorService;
/**
*/
public class TimeseriesQueryRunnerFactory
@ -65,11 +64,11 @@ public class TimeseriesQueryRunnerFactory
@Override
public QueryRunner<Result<TimeseriesResultValue>> mergeRunners(
ExecutorService queryExecutor,
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<Result<TimeseriesResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
}
@Override

View File

@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
@ -35,7 +36,6 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
/**
*/
@ -82,11 +82,11 @@ public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNRes
@Override
public QueryRunner<Result<TopNResultValue>> mergeRunners(
ExecutorService queryExecutor,
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<Result<TopNResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<>(queryExecutor, queryWatcher, queryRunners);
return new ChainedExecutionQueryRunner<>(queryProcessingPool, queryWatcher, queryRunners);
}
@Override

View File

@ -22,6 +22,7 @@ package org.apache.druid.query;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -29,14 +30,20 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@ -47,6 +54,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class ChainedExecutionQueryRunnerTest
{
@ -112,10 +120,10 @@ public class ChainedExecutionQueryRunnerTest
);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
new ForwardingQueryProcessingPool(exec),
watcher,
Lists.newArrayList(
runners
runners
)
);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
@ -236,7 +244,7 @@ public class ChainedExecutionQueryRunnerTest
);
ChainedExecutionQueryRunner chainedRunner = new ChainedExecutionQueryRunner<>(
exec,
new ForwardingQueryProcessingPool(exec),
watcher,
Lists.newArrayList(
runners
@ -306,6 +314,35 @@ public class ChainedExecutionQueryRunnerTest
EasyMock.verify(watcher);
}
@Test
public void testSubmittedTaskType()
{
QueryProcessingPool queryProcessingPool = Mockito.mock(QueryProcessingPool.class);
QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2014/2015")
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 100, "queryId", "test"))
.build();
List<QueryRunner<Result<TimeseriesResultValue>>> runners = Arrays.asList(
Mockito.mock(QueryRunner.class),
Mockito.mock(QueryRunner.class)
);
ChainedExecutionQueryRunner<Result<TimeseriesResultValue>> chainedRunner = new ChainedExecutionQueryRunner<>(
queryProcessingPool,
watcher,
runners
);
Mockito.when(queryProcessingPool.submitRunnerTask(ArgumentMatchers.any())).thenReturn(Futures.immediateFuture(Collections.singletonList(123)));
chainedRunner.run(QueryPlus.wrap(query)).toList();
ArgumentCaptor<PrioritizedQueryRunnerCallable> captor = ArgumentCaptor.forClass(PrioritizedQueryRunnerCallable.class);
Mockito.verify(queryProcessingPool, Mockito.times(2)).submitRunnerTask(captor.capture());
List<QueryRunner> actual = captor.getAllValues().stream().map(PrioritizedQueryRunnerCallable::getRunner).collect(Collectors.toList());
Assert.assertEquals(runners, actual);
}
private class DyingQueryRunner implements QueryRunner<Integer>
{
private final CountDownLatch start;

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
public class MetricsEmittingQueryProcessingPoolTest
{
@Test
public void testPrioritizedExecutorDelegate()
{
PrioritizedExecutorService service = Mockito.mock(PrioritizedExecutorService.class);
Mockito.when(service.getQueueSize()).thenReturn(10);
ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
List<Event> events = new ArrayList<>();
MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor);
Assert.assertSame(service, processingPool.delegate());
ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", Mockito.mock(Emitter.class))
{
@Override
public void emit(Event event)
{
events.add(event);
}
};
monitor.doMonitor(serviceEmitter);
Assert.assertEquals(1, events.size());
Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getMetric(), "segment/scan/pending");
Assert.assertEquals(((ServiceMetricEvent) (events.get(0))).getValue(), 10);
}
@Test
public void testNonPrioritizedExecutorDelegate()
{
ListeningExecutorService service = Mockito.mock(ListeningExecutorService.class);
ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
MetricsEmittingQueryProcessingPool processingPool = new MetricsEmittingQueryProcessingPool(service, monitor);
Assert.assertSame(service, processingPool.delegate());
ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class);
monitor.doMonitor(serviceEmitter);
Mockito.verifyNoInteractions(serviceEmitter);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryContexts;
@ -10960,7 +10961,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
);
ChainedExecutionQueryRunner ceqr = new ChainedExecutionQueryRunner(
Execs.directExecutor(),
DirectQueryProcessingPool.INSTANCE,
(query1, future) -> {
return;
},

View File

@ -36,7 +36,6 @@ import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
@ -45,8 +44,9 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.offheap.OffheapBufferGenerator;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.MetricsEmittingExecutorService;
import org.apache.druid.query.MetricsEmittingQueryProcessingPool;
import org.apache.druid.query.PrioritizedExecutorService;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.utils.JvmUtils;
@ -93,15 +93,14 @@ public class DruidProcessingModule implements Module
}
@Provides
@Processing
@ManageLifecycle
public ExecutorService getProcessingExecutorService(
public QueryProcessingPool getProcessingExecutorPool(
DruidProcessingConfig config,
ExecutorServiceMonitor executorServiceMonitor,
Lifecycle lifecycle
)
{
return new MetricsEmittingExecutorService(
return new MetricsEmittingQueryProcessingPool(
PrioritizedExecutorService.create(
lifecycle,
config

View File

@ -28,16 +28,16 @@ import org.apache.druid.collections.DummyNonBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.server.metrics.MetricsModule;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
/**
* This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and
@ -58,14 +58,13 @@ public class RouterProcessingModule implements Module
}
@Provides
@Processing
@ManageLifecycle
public ExecutorService getProcessingExecutorService(DruidProcessingConfig config)
public QueryProcessingPool getProcessingExecutorPool(DruidProcessingConfig config)
{
if (config.getNumThreadsConfigured() != ExecutorServiceConfig.DEFAULT_NUM_THREADS) {
log.error("numThreads[%d] configured, that is ignored on Router", config.getNumThreadsConfigured());
}
return Execs.dummy();
return new ForwardingQueryProcessingPool(Execs.dummy());
}
@Provides

View File

@ -25,6 +25,7 @@ import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
@ -38,8 +39,6 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.concurrent.ExecutorService;
public class Appenderators
{
public static Appenderator createRealtime(
@ -54,7 +53,7 @@ public class Appenderators
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
@ -79,7 +78,7 @@ public class Appenderators
objectMapper,
emitter,
conglomerate,
queryExecutorService,
queryProcessingPool,
joinableFactory,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,

View File

@ -25,6 +25,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
@ -39,8 +40,6 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
import java.util.concurrent.ExecutorService;
/**
* This interface defines entities that create and manage potentially multiple {@link Appenderator} instances.
*
@ -76,7 +75,7 @@ public interface AppenderatorsManager
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,

View File

@ -25,8 +25,8 @@ import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
@ -42,14 +42,13 @@ import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.partition.ShardSpec;
import java.io.File;
import java.util.concurrent.ExecutorService;
public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
{
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final DataSegmentPusher dataSegmentPusher;
private final ObjectMapper jsonMapper;
@ -63,7 +62,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
@JacksonInject ServiceEmitter emitter,
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject QueryProcessingPool queryProcessingPool,
@JacksonInject JoinableFactory joinableFactory,
@JacksonInject DataSegmentPusher dataSegmentPusher,
@JacksonInject @Json ObjectMapper jsonMapper,
@ -77,7 +76,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
this.emitter = emitter;
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.dataSegmentPusher = dataSegmentPusher;
this.jsonMapper = jsonMapper;
@ -114,7 +113,7 @@ public class DefaultRealtimeAppenderatorFactory implements AppenderatorFactory
conglomerate,
segmentAnnouncer,
emitter,
queryExecutorService,
queryProcessingPool,
joinableFactory,
cache,
cacheConfig,

View File

@ -26,6 +26,7 @@ import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
@ -40,8 +41,6 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
import java.util.concurrent.ExecutorService;
/**
* This implementation is needed because Overlords and MiddleManagers operate on Task objects which
* can require an AppenderatorsManager to be injected.
@ -67,7 +66,7 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,

View File

@ -26,6 +26,7 @@ import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
@ -40,8 +41,6 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Interval;
import java.util.concurrent.ExecutorService;
/**
* Manages Appenderators for tasks running within a CliPeon process.
*
@ -73,7 +72,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
@ -99,7 +98,7 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
conglomerate,
segmentAnnouncer,
emitter,
queryExecutorService,
queryProcessingPool,
joinableFactory,
cache,
cacheConfig,

View File

@ -32,19 +32,20 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@ -71,7 +72,6 @@ import org.joda.time.Interval;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -89,7 +89,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private final ObjectMapper objectMapper;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactoryWrapper joinableFactoryWrapper;
private final Cache cache;
private final CacheConfig cacheConfig;
@ -101,7 +101,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
ObjectMapper objectMapper,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
@ -113,7 +113,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool");
this.joinableFactoryWrapper = new JoinableFactoryWrapper(joinableFactory);
this.cache = Preconditions.checkNotNull(cache, "cache");
this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
@ -272,7 +272,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
Execs.directExecutor(),
DirectQueryProcessingPool.INSTANCE,
perHydrantRunners
)
),
@ -287,7 +287,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final QueryRunner<T> mergedRunner =
toolChest.mergeResults(
factory.mergeRunners(
queryExecutorService,
queryProcessingPool,
perSegmentRunners
)
);

View File

@ -31,7 +31,6 @@ import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
@ -41,6 +40,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
@ -76,7 +76,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
/**
* Manages {@link Appenderator} instances for the CliIndexer task execution service, which runs all tasks in
@ -108,7 +107,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
private final Map<String, DatasourceBundle> datasourceBundles = new HashMap<>();
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final WorkerConfig workerConfig;
private final Cache cache;
@ -122,7 +121,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
@Inject
public UnifiedIndexerAppenderatorsManager(
@Processing ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
WorkerConfig workerConfig,
Cache cache,
@ -133,7 +132,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider
)
{
this.queryExecutorService = queryExecutorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.workerConfig = workerConfig;
this.cache = cache;
@ -161,7 +160,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ServiceEmitter emitter,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
Cache cache,
CacheConfig cacheConfig,
@ -347,7 +346,7 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
objectMapper,
serviceEmitter,
queryRunnerFactoryConglomerateProvider.get(),
queryExecutorService,
queryProcessingPool,
joinableFactory,
Preconditions.checkNotNull(cache, "cache"),
cacheConfig,

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
@ -46,7 +47,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -70,7 +70,7 @@ public class FlushingPlumber extends RealtimePlumber
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
IndexMerger indexMerger,
IndexIO indexIO,
@ -88,7 +88,7 @@ public class FlushingPlumber extends RealtimePlumber
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
queryProcessingPool,
joinableFactory,
null,
null,

View File

@ -27,8 +27,8 @@ import com.google.common.base.Preconditions;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
@ -39,8 +39,6 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.Duration;
import java.util.concurrent.ExecutorService;
/**
* This plumber just drops segments at the end of a flush duration instead of handing them off. It is only useful if you want to run
* a real time node without the rest of the Druid cluster.
@ -54,7 +52,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
@ -69,7 +67,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject ServiceEmitter emitter,
@JacksonInject QueryRunnerFactoryConglomerate conglomerate,
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject QueryProcessingPool queryProcessingPool,
@JacksonInject JoinableFactory joinableFactory,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@ -86,7 +84,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
segmentAnnouncer,
null,
null,
queryExecutorService,
queryProcessingPool,
joinableFactory,
indexMergerV9,
indexIO,
@ -100,7 +98,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.emitter = emitter;
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@ -127,7 +125,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
queryProcessingPool,
joinableFactory,
indexMergerV9,
indexIO,

View File

@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
@ -138,7 +139,7 @@ public class RealtimePlumber implements Plumber
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
QueryProcessingPool queryProcessingPool,
JoinableFactory joinableFactory,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
@ -168,7 +169,7 @@ public class RealtimePlumber implements Plumber
objectMapper,
emitter,
conglomerate,
queryExecutorService,
queryProcessingPool,
joinableFactory,
cache,
cacheConfig,

View File

@ -26,8 +26,8 @@ import com.google.common.base.Preconditions;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
@ -40,8 +40,6 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import java.util.concurrent.ExecutorService;
/**
*
*/
@ -53,7 +51,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final DataSegmentAnnouncer segmentAnnouncer;
private final SegmentPublisher segmentPublisher;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final ExecutorService queryExecutorService;
private final QueryProcessingPool queryProcessingPool;
private final JoinableFactory joinableFactory;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
@ -70,7 +68,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject SegmentPublisher segmentPublisher,
@JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory,
@JacksonInject @Processing ExecutorService executorService,
@JacksonInject QueryProcessingPool queryProcessingPool,
@JacksonInject JoinableFactory joinableFactory,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@ -86,7 +84,7 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentAnnouncer = segmentAnnouncer;
this.segmentPublisher = segmentPublisher;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryExecutorService = executorService;
this.queryProcessingPool = queryProcessingPool;
this.joinableFactory = joinableFactory;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@ -113,7 +111,7 @@ public class RealtimePlumberSchool implements PlumberSchool
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
queryProcessingPool,
joinableFactory,
dataSegmentPusher,
segmentPublisher,

View File

@ -22,9 +22,9 @@ package org.apache.druid.server;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@ -104,7 +104,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);
final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
DirectQueryProcessingPool.INSTANCE,
() -> StreamSupport.stream(segments.spliterator(), false)
.map(segmentMapFn)
.map(queryRunnerFactory::createRunner).iterator()

View File

@ -26,7 +26,6 @@ import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@ -44,6 +43,7 @@ import org.apache.druid.query.PerSegmentQueryOptimizationContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@ -72,7 +72,6 @@ import org.joda.time.Interval;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -86,7 +85,7 @@ public class ServerManager implements QuerySegmentWalker
private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
private final QueryRunnerFactoryConglomerate conglomerate;
private final ServiceEmitter emitter;
private final ExecutorService exec;
private final QueryProcessingPool queryProcessingPool;
private final CachePopulator cachePopulator;
private final Cache cache;
private final ObjectMapper objectMapper;
@ -99,7 +98,7 @@ public class ServerManager implements QuerySegmentWalker
public ServerManager(
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec,
QueryProcessingPool queryProcessingPool,
CachePopulator cachePopulator,
@Smile ObjectMapper objectMapper,
Cache cache,
@ -112,7 +111,7 @@ public class ServerManager implements QuerySegmentWalker
this.conglomerate = conglomerate;
this.emitter = emitter;
this.exec = exec;
this.queryProcessingPool = queryProcessingPool;
this.cachePopulator = cachePopulator;
this.cache = cache;
this.objectMapper = objectMapper;
@ -229,7 +228,7 @@ public class ServerManager implements QuerySegmentWalker
return CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest.mergeResults(factory.mergeRunners(queryProcessingPool, queryRunners)),
toolChest
),
toolChest,

View File

@ -37,6 +37,7 @@ import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -283,7 +284,7 @@ public class AppenderatorTester implements AutoCloseable
),
new NoopDataSegmentAnnouncer(),
emitter,
queryExecutor,
new ForwardingQueryProcessingPool(queryExecutor),
NoopJoinableFactory.INSTANCE,
MapCache.create(2048),
new CacheConfig(),

View File

@ -26,9 +26,9 @@ import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Druids;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
@ -56,7 +56,7 @@ public class UnifiedIndexerAppenderatorsManagerTest
public final ExpectedException expectedException = ExpectedException.none();
private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager(
Execs.directExecutor(),
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
new WorkerConfig(),
MapCache.create(10),

View File

@ -37,10 +37,10 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
@ -230,7 +230,7 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
announcer,
segmentPublisher,
handoffNotifierFactory,
Execs.directExecutor(),
DirectQueryProcessingPool.INSTANCE,
NoopJoinableFactory.INSTANCE,
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory),
TestHelper.getTestIndexIO(),

View File

@ -46,10 +46,12 @@ import org.apache.druid.query.ConcatQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.Druids;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@ -101,6 +103,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -187,7 +190,7 @@ public class ServerManagerTest
}
},
new NoopServiceEmitter(),
serverManagerExec,
new ForwardingQueryProcessingPool(serverManagerExec),
new ForegroundCachePopulator(new DefaultObjectMapper(), new CachePopulatorStats(), -1),
new DefaultObjectMapper(),
new LocalCacheProvider().get(),
@ -737,7 +740,7 @@ public class ServerManagerTest
@Override
public QueryRunner<Result<SearchResultValue>> mergeRunners(
ExecutorService queryExecutor,
QueryProcessingPool queryProcessingPool,
Iterable<QueryRunner<Result<SearchResultValue>>> queryRunners
)
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.cli;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@ -45,11 +46,11 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
@ -477,14 +478,15 @@ public class DumpSegment extends GuiceRunnable
);
}
private static <T> Sequence<T> executeQuery(final Injector injector, final QueryableIndex index, final Query<T> query)
@VisibleForTesting
static <T> Sequence<T> executeQuery(final Injector injector, final QueryableIndex index, final Query<T> query)
{
final QueryRunnerFactoryConglomerate conglomerate = injector.getInstance(QueryRunnerFactoryConglomerate.class);
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
final QueryRunner<T> runner = factory.createRunner(new QueryableIndexSegment(index, SegmentId.dummy("segment")));
return factory
.getToolchest()
.mergeResults(factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)))
.mergeResults(factory.mergeRunners(DirectQueryProcessingPool.INSTANCE, ImmutableList.of(runner)))
.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.Collections;
public class DumpSegmentTest
{
@Test
public void testExecuteQuery()
{
Injector injector = Mockito.mock(Injector.class);
QueryRunnerFactoryConglomerate conglomerate = Mockito.mock(QueryRunnerFactoryConglomerate.class);
QueryRunnerFactory factory = Mockito.mock(QueryRunnerFactory.class, Mockito.RETURNS_DEEP_STUBS);
QueryRunner runner = Mockito.mock(QueryRunner.class);
QueryRunner mergeRunner = Mockito.mock(QueryRunner.class);
Query query = Mockito.mock(Query.class);
Sequence expected = Sequences.simple(Collections.singletonList(123));
Mockito.when(injector.getInstance(QueryRunnerFactoryConglomerate.class)).thenReturn(conglomerate);
Mockito.when(conglomerate.findFactory(ArgumentMatchers.any())).thenReturn(factory);
Mockito.when(factory.createRunner(ArgumentMatchers.any())).thenReturn(runner);
Mockito.when(factory.getToolchest().mergeResults(factory.mergeRunners(DirectQueryProcessingPool.INSTANCE, ImmutableList.of(runner)))).thenReturn(mergeRunner);
Mockito.when(mergeRunner.run(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(expected);
Sequence actual = DumpSegment.executeQuery(injector, null, query);
Assert.assertSame(expected, actual);
}
}