Add taskActionType dimension to task/action/run/time. (#13333)

* Add taskActionType dimension to task/action/run/time.

* Spelling.
This commit is contained in:
Gian Merlino 2022-11-10 22:30:08 -08:00 committed by GitHub
parent 03175a2b8d
commit 77478f25fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 101 additions and 14 deletions

View File

@ -237,7 +237,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`task/run/time`|Milliseconds taken to run a task.| dataSource, taskId, taskType, taskStatus. |Varies.| |`task/run/time`|Milliseconds taken to run a task.| dataSource, taskId, taskType, taskStatus. |Varies.|
|`task/pending/time`|Milliseconds taken for a task to wait for running.| dataSource, taskId, taskType. |Varies.| |`task/pending/time`|Milliseconds taken for a task to wait for running.| dataSource, taskId, taskType. |Varies.|
|`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| dataSource, taskId, taskType |< 1000 (subsecond)| |`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| dataSource, taskId, taskType |< 1000 (subsecond)|
|`task/action/run/time`|Milliseconds taken to execute a task action.| dataSource, taskId, taskType |Varies from subsecond to a few seconds, based on action type.| |`task/action/run/time`|Milliseconds taken to execute a task action.| dataSource, taskId, taskType, taskActionType |Varies from subsecond to a few seconds, based on action type.|
|`segment/added/bytes`|Size in bytes of new segments created.| dataSource, taskId, taskType, interval. |Varies.| |`segment/added/bytes`|Size in bytes of new segments created.| dataSource, taskId, taskType, interval. |Varies.|
|`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| dataSource, taskId, taskType, interval. |Varies.| |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| dataSource, taskId, taskType, interval. |Varies.|
|`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| dataSource, taskId, taskType, interval. |Varies.| |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| dataSource, taskId, taskType, interval. |Varies.|

View File

@ -3121,7 +3121,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
); );
return true; return true;
} }
} },
objectMapper
); );
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage, taskStorage,

View File

@ -3085,7 +3085,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
); );
return true; return true;
} }
} },
objectMapper
); );
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage, taskStorage,

View File

@ -19,13 +19,18 @@
package org.apache.druid.indexing.common.actions; package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import javax.annotation.Nullable;
import java.util.Map;
public class LocalTaskActionClient implements TaskActionClient public class LocalTaskActionClient implements TaskActionClient
{ {
private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class); private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class);
@ -58,7 +63,7 @@ public class LocalTaskActionClient implements TaskActionClient
try { try {
final long auditLogStartTime = System.currentTimeMillis(); final long auditLogStartTime = System.currentTimeMillis();
storage.addAuditLog(task, taskAction); storage.addAuditLog(task, taskAction);
emitTimerMetric("task/action/log/time", System.currentTimeMillis() - auditLogStartTime); emitTimerMetric("task/action/log/time", taskAction, System.currentTimeMillis() - auditLogStartTime);
} }
catch (Exception e) { catch (Exception e) {
final String actionClass = taskAction.getClass().getName(); final String actionClass = taskAction.getClass().getName();
@ -72,14 +77,35 @@ public class LocalTaskActionClient implements TaskActionClient
final long performStartTime = System.currentTimeMillis(); final long performStartTime = System.currentTimeMillis();
final RetType result = taskAction.perform(task, toolbox); final RetType result = taskAction.perform(task, toolbox);
emitTimerMetric("task/action/run/time", System.currentTimeMillis() - performStartTime); emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime);
return result; return result;
} }
private void emitTimerMetric(final String metric, final long time) private void emitTimerMetric(final String metric, final TaskAction<?> action, final long time)
{ {
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task); IndexTaskUtils.setTaskDimensions(metricBuilder, task);
final String actionType = getActionType(toolbox.getJsonMapper(), action);
if (actionType != null) {
metricBuilder.setDimension("taskActionType", actionType);
}
toolbox.getEmitter().emit(metricBuilder.build(metric, Math.max(0, time))); toolbox.getEmitter().emit(metricBuilder.build(metric, Math.max(0, time)));
} }
@Nullable
static String getActionType(final ObjectMapper jsonMapper, final TaskAction<?> action)
{
try {
final Map<String, Object> m = jsonMapper.convertValue(action, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
final Object typeObject = m.get(TaskAction.TYPE_FIELD);
if (typeObject instanceof String) {
return (String) typeObject;
} else {
return null;
}
}
catch (Exception e) {
return null;
}
}
} }

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Task;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = TaskAction.TYPE_FIELD)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "lockAcquire", value = TimeChunkLockAcquireAction.class), @JsonSubTypes.Type(name = "lockAcquire", value = TimeChunkLockAcquireAction.class),
@JsonSubTypes.Type(name = "lockTryAcquire", value = TimeChunkLockTryAcquireAction.class), @JsonSubTypes.Type(name = "lockTryAcquire", value = TimeChunkLockTryAcquireAction.class),
@ -50,8 +50,12 @@ import org.apache.druid.indexing.common.task.Task;
}) })
public interface TaskAction<RetType> public interface TaskAction<RetType>
{ {
String TYPE_FIELD = "type";
TypeReference<RetType> getReturnTypeReference(); // T_T TypeReference<RetType> getReturnTypeReference(); // T_T
RetType perform(Task task, TaskActionToolbox toolbox); RetType perform(Task task, TaskActionToolbox toolbox);
boolean isAudited(); boolean isAudited();
@Override @Override

View File

@ -19,8 +19,10 @@
package org.apache.druid.indexing.common.actions; package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunner;
@ -36,6 +38,7 @@ public class TaskActionToolbox
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final SupervisorManager supervisorManager; private final SupervisorManager supervisorManager;
private final ObjectMapper jsonMapper;
private Optional<TaskRunnerFactory> factory = Optional.absent(); private Optional<TaskRunnerFactory> factory = Optional.absent();
@Inject @Inject
@ -44,7 +47,8 @@ public class TaskActionToolbox
TaskStorage taskStorage, TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter, ServiceEmitter emitter,
SupervisorManager supervisorManager SupervisorManager supervisorManager,
@Json ObjectMapper jsonMapper
) )
{ {
this.taskLockbox = taskLockbox; this.taskLockbox = taskLockbox;
@ -52,6 +56,7 @@ public class TaskActionToolbox
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter; this.emitter = emitter;
this.supervisorManager = supervisorManager; this.supervisorManager = supervisorManager;
this.jsonMapper = jsonMapper;
} }
public TaskLockbox getTaskLockbox() public TaskLockbox getTaskLockbox()
@ -79,6 +84,11 @@ public class TaskActionToolbox
return supervisorManager; return supervisorManager;
} }
public ObjectMapper getJsonMapper()
{
return jsonMapper;
}
@Inject(optional = true) @Inject(optional = true)
public void setTaskRunnerFactory(TaskRunnerFactory factory) public void setTaskRunnerFactory(TaskRunnerFactory factory)
{ {

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class LocalTaskActionClientTest
{
private final ObjectMapper objectMapper = new DefaultObjectMapper();
@Test
public void testGetActionType()
{
final TaskAction<?> action = SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);
Assert.assertEquals("segmentTransactionalInsert", LocalTaskActionClient.getActionType(objectMapper, action));
}
}

View File

@ -104,7 +104,8 @@ public class TaskActionTestKit extends ExternalResource
taskStorage, taskStorage,
metadataStorageCoordinator, metadataStorageCoordinator,
new NoopServiceEmitter(), new NoopServiceEmitter(),
EasyMock.createMock(SupervisorManager.class) EasyMock.createMock(SupervisorManager.class),
objectMapper
); );
testDerbyConnector.createDataSourceTable(); testDerbyConnector.createDataSourceTable();
testDerbyConnector.createPendingSegmentsTable(); testDerbyConnector.createPendingSegmentsTable();

View File

@ -34,7 +34,7 @@ public class TaskActionToolboxTest
@Test @Test
public void testMakeCodeCoverageHappy() public void testMakeCodeCoverageHappy()
{ {
TaskActionToolbox toolbox = new TaskActionToolbox(null, null, null, null, null); TaskActionToolbox toolbox = new TaskActionToolbox(null, null, null, null, null, null);
assertFalse(toolbox.getTaskRunner().isPresent()); assertFalse(toolbox.getTaskRunner().isPresent());
ForkingTaskRunnerFactory factory = mock(ForkingTaskRunnerFactory.class); ForkingTaskRunnerFactory factory = mock(ForkingTaskRunnerFactory.class);
when(factory.get()).thenReturn(mock(ForkingTaskRunner.class)); when(factory.get()).thenReturn(mock(ForkingTaskRunner.class));

View File

@ -1574,7 +1574,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
taskStorage, taskStorage,
mdc, mdc,
EMITTER, EMITTER,
EasyMock.createMock(SupervisorManager.class) EasyMock.createMock(SupervisorManager.class),
OBJECT_MAPPER
); );
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage, taskStorage,

View File

@ -206,7 +206,8 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
taskStorage, taskStorage,
storageCoordinator, storageCoordinator,
new NoopServiceEmitter(), new NoopServiceEmitter(),
null null,
objectMapper
); );
} }

View File

@ -915,7 +915,8 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
taskStorage, taskStorage,
mdc, mdc,
EMITTER, EMITTER,
EasyMock.createMock(SupervisorManager.class) EasyMock.createMock(SupervisorManager.class),
new DefaultObjectMapper()
); );
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage, taskStorage,

View File

@ -598,7 +598,8 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
taskStorage, taskStorage,
mdc, mdc,
emitter, emitter,
EasyMock.createMock(SupervisorManager.class) EasyMock.createMock(SupervisorManager.class),
mapper
), ),
new TaskAuditLogConfig(true) new TaskAuditLogConfig(true)
); );

View File

@ -1631,6 +1631,7 @@ poolName
remoteAddress remoteAddress
segmentAvailabilityConfirmed segmentAvailabilityConfirmed
serviceName serviceName
taskActionType
taskIngestionMode taskIngestionMode
taskStatus taskStatus
taskType taskType