diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 5dbdc8d5dd7..d4eb951cf74 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -237,7 +237,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i |`task/run/time`|Milliseconds taken to run a task.| dataSource, taskId, taskType, taskStatus. |Varies.| |`task/pending/time`|Milliseconds taken for a task to wait for running.| dataSource, taskId, taskType. |Varies.| |`task/action/log/time`|Milliseconds taken to log a task action to the audit log.| dataSource, taskId, taskType |< 1000 (subsecond)| -|`task/action/run/time`|Milliseconds taken to execute a task action.| dataSource, taskId, taskType |Varies from subsecond to a few seconds, based on action type.| +|`task/action/run/time`|Milliseconds taken to execute a task action.| dataSource, taskId, taskType, taskActionType |Varies from subsecond to a few seconds, based on action type.| |`segment/added/bytes`|Size in bytes of new segments created.| dataSource, taskId, taskType, interval. |Varies.| |`segment/moved/bytes`|Size in bytes of segments moved/archived via the Move Task.| dataSource, taskId, taskType, interval. |Varies.| |`segment/nuked/bytes`|Size in bytes of segments deleted via the Kill Task.| dataSource, taskId, taskType, interval. |Varies.| diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 36ddea4c36e..a83bbaf5153 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -3121,7 +3121,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase ); return true; } - } + }, + objectMapper ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( taskStorage, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index c59c479569f..553b601f7fc 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -3085,7 +3085,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase ); return true; } - } + }, + objectMapper ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( taskStorage, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index 8c7cb533aeb..27e0bcbaa29 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -19,13 +19,18 @@ package org.apache.druid.indexing.common.actions; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import javax.annotation.Nullable; +import java.util.Map; + public class LocalTaskActionClient implements TaskActionClient { private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class); @@ -58,7 +63,7 @@ public class LocalTaskActionClient implements TaskActionClient try { final long auditLogStartTime = System.currentTimeMillis(); storage.addAuditLog(task, taskAction); - emitTimerMetric("task/action/log/time", System.currentTimeMillis() - auditLogStartTime); + emitTimerMetric("task/action/log/time", taskAction, System.currentTimeMillis() - auditLogStartTime); } catch (Exception e) { final String actionClass = taskAction.getClass().getName(); @@ -72,14 +77,35 @@ public class LocalTaskActionClient implements TaskActionClient final long performStartTime = System.currentTimeMillis(); final RetType result = taskAction.perform(task, toolbox); - emitTimerMetric("task/action/run/time", System.currentTimeMillis() - performStartTime); + emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime); return result; } - private void emitTimerMetric(final String metric, final long time) + private void emitTimerMetric(final String metric, final TaskAction action, final long time) { final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); IndexTaskUtils.setTaskDimensions(metricBuilder, task); + final String actionType = getActionType(toolbox.getJsonMapper(), action); + if (actionType != null) { + metricBuilder.setDimension("taskActionType", actionType); + } toolbox.getEmitter().emit(metricBuilder.build(metric, Math.max(0, time))); } + + @Nullable + static String getActionType(final ObjectMapper jsonMapper, final TaskAction action) + { + try { + final Map m = jsonMapper.convertValue(action, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT); + final Object typeObject = m.get(TaskAction.TYPE_FIELD); + if (typeObject instanceof String) { + return (String) typeObject; + } else { + return null; + } + } + catch (Exception e) { + return null; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 02e5ef50129..559039d96ef 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.indexing.common.task.Task; -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = TaskAction.TYPE_FIELD) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "lockAcquire", value = TimeChunkLockAcquireAction.class), @JsonSubTypes.Type(name = "lockTryAcquire", value = TimeChunkLockTryAcquireAction.class), @@ -50,8 +50,12 @@ import org.apache.druid.indexing.common.task.Task; }) public interface TaskAction { + String TYPE_FIELD = "type"; + TypeReference getReturnTypeReference(); // T_T + RetType perform(Task task, TaskActionToolbox toolbox); + boolean isAudited(); @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java index 61c6180f4a9..134a9bf6c72 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java @@ -19,8 +19,10 @@ package org.apache.druid.indexing.common.actions; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.inject.Inject; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskRunner; @@ -36,6 +38,7 @@ public class TaskActionToolbox private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private final ServiceEmitter emitter; private final SupervisorManager supervisorManager; + private final ObjectMapper jsonMapper; private Optional factory = Optional.absent(); @Inject @@ -44,7 +47,8 @@ public class TaskActionToolbox TaskStorage taskStorage, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, ServiceEmitter emitter, - SupervisorManager supervisorManager + SupervisorManager supervisorManager, + @Json ObjectMapper jsonMapper ) { this.taskLockbox = taskLockbox; @@ -52,6 +56,7 @@ public class TaskActionToolbox this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; this.emitter = emitter; this.supervisorManager = supervisorManager; + this.jsonMapper = jsonMapper; } public TaskLockbox getTaskLockbox() @@ -79,6 +84,11 @@ public class TaskActionToolbox return supervisorManager; } + public ObjectMapper getJsonMapper() + { + return jsonMapper; + } + @Inject(optional = true) public void setTaskRunnerFactory(TaskRunnerFactory factory) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientTest.java new file mode 100644 index 00000000000..55307984824 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class LocalTaskActionClientTest +{ + private final ObjectMapper objectMapper = new DefaultObjectMapper(); + + @Test + public void testGetActionType() + { + final TaskAction action = SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null); + Assert.assertEquals("segmentTransactionalInsert", LocalTaskActionClient.getActionType(objectMapper, action)); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index ea2c854c3e2..2d6b22732a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -104,7 +104,8 @@ public class TaskActionTestKit extends ExternalResource taskStorage, metadataStorageCoordinator, new NoopServiceEmitter(), - EasyMock.createMock(SupervisorManager.class) + EasyMock.createMock(SupervisorManager.class), + objectMapper ); testDerbyConnector.createDataSourceTable(); testDerbyConnector.createPendingSegmentsTable(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java index 232239ce9ba..d58c1f5df1a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionToolboxTest.java @@ -34,7 +34,7 @@ public class TaskActionToolboxTest @Test public void testMakeCodeCoverageHappy() { - TaskActionToolbox toolbox = new TaskActionToolbox(null, null, null, null, null); + TaskActionToolbox toolbox = new TaskActionToolbox(null, null, null, null, null, null); assertFalse(toolbox.getTaskRunner().isPresent()); ForkingTaskRunnerFactory factory = mock(ForkingTaskRunnerFactory.class); when(factory.get()).thenReturn(mock(ForkingTaskRunner.class)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index f2a742bdb25..7bff995578d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1574,7 +1574,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand taskStorage, mdc, EMITTER, - EasyMock.createMock(SupervisorManager.class) + EasyMock.createMock(SupervisorManager.class), + OBJECT_MAPPER ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( taskStorage, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 2d3cb8dbad5..599f1ffc188 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -206,7 +206,8 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest taskStorage, storageCoordinator, new NoopServiceEmitter(), - null + null, + objectMapper ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index d571bd0bc47..83b8a7bf605 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -915,7 +915,8 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest taskStorage, mdc, EMITTER, - EasyMock.createMock(SupervisorManager.class) + EasyMock.createMock(SupervisorManager.class), + new DefaultObjectMapper() ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( taskStorage, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 83cb962874b..443f2c6d1e0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -598,7 +598,8 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest taskStorage, mdc, emitter, - EasyMock.createMock(SupervisorManager.class) + EasyMock.createMock(SupervisorManager.class), + mapper ), new TaskAuditLogConfig(true) ); diff --git a/website/.spelling b/website/.spelling index 30300bd6a48..836d03c243c 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1631,6 +1631,7 @@ poolName remoteAddress segmentAvailabilityConfirmed serviceName +taskActionType taskIngestionMode taskStatus taskType