* add taskStatus dimension to `service/heartbeat` metric

This commit is contained in:
zachjsh 2024-11-18 15:01:06 -05:00
parent 2b4c87fa1c
commit cfb02a2813
8 changed files with 40 additions and 4 deletions

View File

@ -391,9 +391,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon
### Service Health
|Metric|Description|Dimensions|Normal value|
|------|-----------|----------|------------|
| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.<br />`workerVersion`, `category`, `status` on the Middle Manager.<br />`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon |1|
|Metric|Description| Dimensions |Normal value|
|------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------|
| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.<br />`workerVersion`, `category`, `status` on the Middle Manager.<br />`taskId`, `groupId`, `taskType`, `taskStatus`, `dataSource`, `tags` on the Peon |1|
### Historical

View File

@ -190,7 +190,7 @@
"namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" },
"namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" },
"service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" },
"service/heartbeat" : { "dimensions" : ["leader", "workerVersion", "category", "status", "taskId", "groupId", "dataSource", "taskStatus" ], "type" : "count" },
"killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" },
"killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" },

View File

@ -2232,6 +2232,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
)
);
Assert.assertEquals(Status.NOT_STARTED.toString(), task.getStatus());
final ListenableFuture<TaskStatus> future = runTask(task);
// Insert some data, but not enough for the task to finish
@ -2242,6 +2244,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
}
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(Status.READING, task.getRunner().getStatus());
Assert.assertEquals(Status.READING.toString(), task.getStatus());
Map<KafkaTopicPartition, Long> currentOffsets = OBJECT_MAPPER.readValue(
task.getRunner().pause().getEntity().toString(),
@ -2250,6 +2254,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
}
);
Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus());
Assert.assertEquals(Status.PAUSED.toString(), task.getStatus());
// Insert remaining data
insertData(Iterables.skip(records, 4));
@ -2267,6 +2272,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
Assert.assertEquals(Status.PUBLISHING.toString(), task.getStatus());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
// Check published metadata and segments in deep storage

View File

@ -48,6 +48,8 @@ import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -313,6 +315,15 @@ public interface Task
return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType());
}
/**
* @return The status of the task. Note: this interface method is unstable at this time.
*/
@Nullable
default String getStatus()
{
return null;
}
static TaskInfo<TaskIdentifier, TaskStatus> toTaskIdentifierInfo(TaskInfo<Task, TaskStatus> taskInfo)
{
return new TaskInfo<>(

View File

@ -180,6 +180,13 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext);
}
@Nullable
@Override
public String getStatus()
{
return (getRunner() != null && getRunner().getStatus() != null) ? getRunner().getStatus().toString() : null;
}
public Appenderator newAppenderator(
TaskToolbox toolbox,
SegmentGenerationMetrics metrics,

View File

@ -305,4 +305,11 @@ public class AbstractTaskTest
Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode);
}
@Test
public void testGetStatus()
{
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null);
Assert.assertNull(task.getStatus());
}
}

View File

@ -1546,6 +1546,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
"0"
);
Assert.assertNull(id1.getStatus());
final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
Collection workItems = new ArrayList<>();

View File

@ -308,6 +308,9 @@ public class CliPeon extends GuiceRunnable
builder.put(DruidMetrics.DATASOURCE, task.getDataSource());
builder.put(DruidMetrics.TASK_TYPE, task.getType());
builder.put(DruidMetrics.GROUP_ID, task.getGroupId());
if (task.getStatus() != null) {
builder.put(DruidMetrics.TASK_STATUS, task.getStatus());
}
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
if (tags != null && !tags.isEmpty()) {
builder.put(DruidMetrics.TAGS, tags);