Add a metric for task duration in the pending queue (#12492)

This PR is to measure how long a task stays in the pending queue and emits the value with the metric task/pending/time. The metric is measured in RemoteTaskRunner and HttpRemoteTaskRunner.

An example of the metric:

```
2022-04-26T21:59:09,488 INFO [rtr-pending-tasks-runner-0] org.apache.druid.java.util.emitter.core.LoggingEmitter - {"feed":"metrics","timestamp":"2022-04-26T21:59:09.487Z","service":"druid/coordinator","host":"localhost:8081","version":"2022.02.0-iap-SNAPSHOT","metric":"task/pending/time","value":8,"dataSource":"wikipedia","taskId":"index_parallel_wikipedia_gecpcglg_2022-04-26T21:59:09.432Z","taskType":"index_parallel"}
```

------------------------------------------
Key changed/added classes in this PR

    Emit metric task/pending/time in classes RemoteTaskRunner and HttpRemoteTaskRunner.
    Update related factory classes and tests.
This commit is contained in:
Rocky Chen 2022-05-02 20:47:25 -07:00 committed by GitHub
parent 785a1eeb9f
commit 770ad95169
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 146 additions and 17 deletions

View File

@ -198,6 +198,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`task/run/time`|Milliseconds taken to run a task.|dataSource, taskId, taskType, taskStatus.|Varies.|
|`task/pending/time`|Milliseconds taken for a task to wait for running.|dataSource, taskId, taskType.|Varies.|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.|dataSource, taskId, taskType|< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.|dataSource, taskId, taskType|Varies from subsecond to a few seconds, based on action type.|
|`segment/added/bytes`|Size in bytes of new segments created.|dataSource, taskId, taskType, interval.|Varies.|

View File

@ -54,6 +54,7 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
@ -74,6 +75,8 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@ -179,6 +182,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
private final ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy;
private final ServiceEmitter emitter;
private ProvisioningService provisioningService;
public RemoteTaskRunner(
@ -189,7 +193,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
PathChildrenCacheFactory.Builder pathChildrenCacheFactory,
HttpClient httpClient,
Supplier<WorkerBehaviorConfig> workerConfigRef,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
ServiceEmitter emitter
)
{
this.jsonMapper = jsonMapper;
@ -213,6 +218,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
config.getPendingTasksRunnerNumThreads(),
"rtr-pending-tasks-runner-%d"
);
this.emitter = emitter;
}
@Override
@ -934,6 +940,13 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return false;
}
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
emitter.emit(metricBuilder.build(
"task/pending/time",
new Duration(workItem.getQueueInsertionTime(), DateTimes.nowUtc()).getMillis())
);
RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
@ -1516,6 +1529,12 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return workersWithUnacknowledgedTask;
}
@VisibleForTesting
ProvisioningStrategy<WorkerTaskRunner> getProvisioningStrategy()
{
return provisioningStrategy;
}
@Override
public Map<String, Long> getTotalTaskSlotCount()
{

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfi
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
@ -46,6 +47,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
private final Supplier<WorkerBehaviorConfig> workerConfigRef;
private final ProvisioningSchedulerConfig provisioningSchedulerConfig;
private final ProvisioningStrategy provisioningStrategy;
private final ServiceEmitter emitter;
@Inject
public RemoteTaskRunnerFactory(
@ -56,7 +58,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
@EscalatedGlobal final HttpClient httpClient,
final Supplier<WorkerBehaviorConfig> workerConfigRef,
final ProvisioningSchedulerConfig provisioningSchedulerConfig,
final ProvisioningStrategy provisioningStrategy
final ProvisioningStrategy provisioningStrategy,
final ServiceEmitter emitter
)
{
this.curator = curator;
@ -67,6 +70,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
this.workerConfigRef = workerConfigRef;
this.provisioningSchedulerConfig = provisioningSchedulerConfig;
this.provisioningStrategy = provisioningStrategy;
this.emitter = emitter;
}
@Override
@ -80,7 +84,8 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory<RemoteTaskRunn
new PathChildrenCacheFactory.Builder().withCompressed(true),
httpClient,
workerConfigRef,
provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>()
provisioningSchedulerConfig.isDoAutoscale() ? provisioningStrategy : new NoopProvisioningStrategy<>(),
emitter
);
}
}

View File

@ -50,6 +50,7 @@ import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem;
@ -76,6 +77,8 @@ import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
@ -83,6 +86,7 @@ import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
@ -182,6 +186,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private final HttpRemoteTaskRunnerConfig config;
private final TaskStorage taskStorage;
private final ServiceEmitter emitter;
// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
private static final Joiner JOINER = Joiner.on("/");
@ -203,7 +208,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
TaskStorage taskStorage,
@Nullable CuratorFramework cf,
IndexerZkConfig indexerZkConfig
IndexerZkConfig indexerZkConfig,
ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@ -212,6 +218,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.workerConfigRef = workerConfigRef;
this.emitter = emitter;
this.pendingTasksExec = Execs.multiThreaded(
config.getPendingTasksRunnerNumThreads(),
@ -1548,6 +1555,14 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost());
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, taskItem.getTask());
emitter.emit(metricBuilder.build(
"task/pending/time",
new Duration(taskItem.getCreatedTime(), DateTimes.nowUtc()).getMillis())
);
// fall through
case RUNNING:
if (worker.getHost().equals(taskItem.getWorker().getHost())) {

View File

@ -35,6 +35,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfi
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
@ -54,6 +55,7 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemote
private final ProvisioningStrategy provisioningStrategy;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final TaskStorage taskStorage;
private final ServiceEmitter emitter;
// ZK_CLEANUP_TODO : Remove these when RemoteTaskRunner and WorkerTaskMonitor are removed.
@Nullable //Null if zk is disabled
@ -72,7 +74,8 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemote
final TaskStorage taskStorage,
final Provider<CuratorFramework> cfProvider,
final IndexerZkConfig indexerZkConfig,
final ZkEnablementConfig zkEnablementConfig
final ZkEnablementConfig zkEnablementConfig,
final ServiceEmitter emitter
)
{
this.smileMapper = smileMapper;
@ -84,6 +87,7 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemote
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.taskStorage = taskStorage;
this.indexerZkConfig = indexerZkConfig;
this.emitter = emitter;
if (zkEnablementConfig.isEnabled()) {
this.cf = cfProvider.get();
@ -104,7 +108,8 @@ public class HttpRemoteTaskRunnerFactory implements TaskRunnerFactory<HttpRemote
druidNodeDiscoveryProvider,
taskStorage,
cf,
indexerZkConfig
indexerZkConfig,
emitter
);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfig;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class RemoteTaskRunnerFactoryTest
{
@Test
public void testBuildWithAutoScale()
{
ProvisioningSchedulerConfig provisioningSchedulerConfig = Mockito.mock(ProvisioningSchedulerConfig.class);
Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(true);
RemoteTaskRunnerFactory remoteTaskRunnerFactory = getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);
Assert.assertNull(remoteTaskRunnerFactory.build().getProvisioningStrategy());
}
@Test
public void testBuildWithoutAutoScale()
{
ProvisioningSchedulerConfig provisioningSchedulerConfig = Mockito.mock(ProvisioningSchedulerConfig.class);
Mockito.when(provisioningSchedulerConfig.isDoAutoscale()).thenReturn(false);
RemoteTaskRunnerFactory remoteTaskRunnerFactory = getTestRemoteTaskRunnerFactory(provisioningSchedulerConfig);
Assert.assertTrue(remoteTaskRunnerFactory.build().getProvisioningStrategy() instanceof NoopProvisioningStrategy);
}
private RemoteTaskRunnerFactory getTestRemoteTaskRunnerFactory(ProvisioningSchedulerConfig provisioningSchedulerConfig)
{
CuratorFramework curator = Mockito.mock(CuratorFramework.class);
Mockito.when(curator.newWatcherRemoveCuratorFramework()).thenReturn(null);
return new RemoteTaskRunnerFactory(
curator,
new RemoteTaskRunnerConfig(),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
null,
null,
null,
provisioningSchedulerConfig,
null,
null
);
}
}

View File

@ -47,6 +47,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.atomic.AtomicReference;
@ -270,7 +271,8 @@ public class RemoteTaskRunnerTestUtils
pathChildrenCacheFactory,
httpClient,
workerConfigRef,
provisioningStrategy
provisioningStrategy,
new NoopServiceEmitter()
);
}

View File

@ -230,7 +230,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -298,7 +299,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -402,7 +404,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
taskStorageMock,
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -544,7 +547,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -719,7 +723,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -916,7 +921,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -1405,7 +1411,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -1517,7 +1524,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override
@ -1625,7 +1633,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
taskStorage,
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
);
taskRunner.start();
@ -1896,7 +1905,8 @@ public class HttpRemoteTaskRunnerTest
druidNodeDiscoveryProvider,
EasyMock.createNiceMock(TaskStorage.class),
EasyMock.createNiceMock(CuratorFramework.class),
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
new NoopServiceEmitter()
)
{
@Override