QueryCountStatsMonitor can be injected in the Peon (#10092)

* QueryCountStatsMonitor can be injected in the Peon

This change fixes a dependency injection bug where there is a circular
dependency on getting the MonitorScheduler when a user configures the
QueryCountStatsMonitor to be used.

* fix tests

* Actually fix the tests this time
This commit is contained in:
Suneet Saldanha 2020-06-29 21:03:07 -07:00 committed by GitHub
parent 69f2b1ef00
commit 363d0d86be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 59 additions and 36 deletions

View File

@ -2703,7 +2703,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
this::makeTimeseriesAndScanConglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),

View File

@ -2963,7 +2963,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
this::makeTimeseriesOnlyConglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),

View File

@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.IntermediaryDataManager;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
@ -57,6 +58,7 @@ import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
@ -86,7 +88,8 @@ public class TaskToolbox
* because it may be unavailable, e. g. for batch tasks running in Spark or Hadoop.
*/
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final MonitorScheduler monitorScheduler;
@Nullable
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final SegmentLoader segmentLoader;
@ -120,7 +123,7 @@ public class TaskToolbox
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
MonitorScheduler monitorScheduler,
@Nullable Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoader segmentLoader,
ObjectMapper jsonMapper,
File taskWorkDir,
@ -151,7 +154,7 @@ public class TaskToolbox
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.monitorScheduler = monitorScheduler;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoader = segmentLoader;
this.jsonMapper = jsonMapper;
this.taskWorkDir = taskWorkDir;
@ -239,9 +242,34 @@ public class TaskToolbox
return joinableFactory;
}
@Nullable
public MonitorScheduler getMonitorScheduler()
{
return monitorScheduler;
return monitorSchedulerProvider == null ? null : monitorSchedulerProvider.get();
}
/**
* Adds a monitor to the monitorScheduler if it is configured
* @param monitor
*/
public void addMonitor(Monitor monitor)
{
MonitorScheduler scheduler = getMonitorScheduler();
if (scheduler != null) {
scheduler.addMonitor(monitor);
}
}
/**
* Adds a monitor to the monitorScheduler if it is configured
* @param monitor
*/
public void removeMonitor(Monitor monitor)
{
MonitorScheduler scheduler = getMonitorScheduler();
if (scheduler != null) {
scheduler.removeMonitor(monitor);
}
}
public ObjectMapper getJsonMapper()

View File

@ -74,7 +74,7 @@ public class TaskToolboxFactory
private final Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
private final ExecutorService queryExecutorService;
private final JoinableFactory joinableFactory;
private final MonitorScheduler monitorScheduler;
private final Provider<MonitorScheduler> monitorSchedulerProvider;
private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper jsonMapper;
private final IndexIO indexIO;
@ -105,7 +105,7 @@ public class TaskToolboxFactory
Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider,
@Processing ExecutorService queryExecutorService,
JoinableFactory joinableFactory,
MonitorScheduler monitorScheduler,
Provider<MonitorScheduler> monitorSchedulerProvider,
SegmentLoaderFactory segmentLoaderFactory,
@Json ObjectMapper jsonMapper,
IndexIO indexIO,
@ -135,7 +135,7 @@ public class TaskToolboxFactory
this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
this.queryExecutorService = queryExecutorService;
this.joinableFactory = joinableFactory;
this.monitorScheduler = monitorScheduler;
this.monitorSchedulerProvider = monitorSchedulerProvider;
this.segmentLoaderFactory = segmentLoaderFactory;
this.jsonMapper = jsonMapper;
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
@ -169,7 +169,7 @@ public class TaskToolboxFactory
queryRunnerFactoryConglomerateProvider,
queryExecutorService,
joinableFactory,
monitorScheduler,
monitorSchedulerProvider,
segmentLoaderFactory.manufacturate(taskWorkDir),
jsonMapper,
taskWorkDir,

View File

@ -327,7 +327,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
);
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
toolbox.addMonitor(metricsMonitor);
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();
@ -444,7 +444,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
appenderator.close();
CloseQuietly.close(driver);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
toolbox.removeMonitor(metricsMonitor);
if (appenderatorsManager.shouldTaskMakeNodeAnnouncements()) {
toolbox.getDataSegmentServerAnnouncer().unannounce();

View File

@ -378,7 +378,7 @@ public class RealtimeIndexTask extends AbstractTask
plumber.startJob();
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
toolbox.addMonitor(metricsMonitor);
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();
@ -473,7 +473,7 @@ public class RealtimeIndexTask extends AbstractTask
if (firehose != null) {
CloseQuietly.close(firehose);
}
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
toolbox.removeMonitor(metricsMonitor);
}
}

View File

@ -157,14 +157,12 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
final RowIngestionMeters buildSegmentsMeters = new DropwizardRowIngestionMeters();
if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(
toolbox.addMonitor(
new RealtimeMetricsMonitor(
Collections.singletonList(fireDepartmentForMetrics),
Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
)
);
}
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final PartitionsSpec partitionsSpec = tuningConfig.getGivenOrDefaultPartitionsSpec();

View File

@ -296,14 +296,12 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null);
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(
toolbox.addMonitor(
new RealtimeMetricsMonitor(
Collections.singletonList(fireDepartmentForMetrics),
Collections.singletonMap(DruidMetrics.TASK_ID, new String[]{getId()})
)
);
}
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec();

View File

@ -397,8 +397,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
null
);
FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
toolbox.getMonitorScheduler()
.addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, fireDepartmentForMetrics, rowIngestionMeters));
toolbox.addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, fireDepartmentForMetrics, rowIngestionMeters));
final String lookupTier = task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER);
final LookupNodeService lookupNodeService = lookupTier == null ?

View File

@ -111,7 +111,7 @@ public class TaskToolboxTest
() -> mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
NoopJoinableFactory.INSTANCE,
mockMonitorScheduler,
() -> mockMonitorScheduler,
mockSegmentLoaderFactory,
ObjectMapper,
mockIndexIO,

View File

@ -1594,7 +1594,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),

View File

@ -981,7 +981,7 @@ public class RealtimeIndexTaskTest
() -> conglomerate,
Execs.directExecutor(), // queryExecutorService
NoopJoinableFactory.INSTANCE,
EasyMock.createMock(MonitorScheduler.class),
() -> EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),

View File

@ -665,7 +665,7 @@ public class TaskLifecycleTest
() -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
Execs.directExecutor(), // query executor service
NoopJoinableFactory.INSTANCE,
monitorScheduler, // monitor scheduler
() -> monitorScheduler, // monitor scheduler
new SegmentLoaderFactory(null, new DefaultObjectMapper()),
MAPPER,
INDEX_IO,