mirror of https://github.com/apache/druid.git
Add new metric that quantifies how long batch ingest jobs waited for segment availability and whether or not that wait was successful (#12002)
* add a unit test that tests that new metric is emitted * remove unused import * clarify in doc that this is for batch tasks * fix IndexTaskTest
This commit is contained in:
parent
ffa4783ce8
commit
761fe9f144
|
@ -213,6 +213,7 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|
|||
|`taskSlot/used/count`|Number of busy task slots per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|
||||
|`taskSlot/lazy/count`|Number of total task slots in lazy marked MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|
||||
|`taskSlot/blacklisted/count`|Number of total task slots in blacklisted MiddleManagers and Indexers per emission period. This metric is only available if the TaskSlotCountStatsMonitor module is included.|category.|Varies.|
|
||||
|`task/segmentAvailability/wait/time`|The amount of milliseconds a batch indexing task waited for newly created segments to become available for querying.|dataSource, taskType, taskId, segmentAvailabilityConfirmed|Varies.|
|
||||
|
||||
## Shuffle metrics (Native parallel task)
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.druid.java.util.common.granularity.Granularity;
|
|||
import org.apache.druid.java.util.common.granularity.GranularityType;
|
||||
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
|
||||
import org.apache.druid.java.util.common.logger.Logger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
|
||||
import org.apache.druid.segment.incremental.ParseExceptionHandler;
|
||||
|
@ -674,6 +675,14 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
|
|||
}
|
||||
finally {
|
||||
segmentAvailabilityWaitTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
toolbox.getEmitter().emit(
|
||||
new ServiceMetricEvent.Builder()
|
||||
.setDimension("dataSource", getDataSource())
|
||||
.setDimension("taskType", getType())
|
||||
.setDimension("taskId", getId())
|
||||
.setDimension("segmentAvailabilityConfirmed", segmentAvailabilityConfirmationCompleted)
|
||||
.build("task/segmentAvailability/wait/time", segmentAvailabilityWaitTimeMs)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -61,6 +61,8 @@ import org.apache.druid.java.util.common.StringUtils;
|
|||
import org.apache.druid.java.util.common.granularity.Granularities;
|
||||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.emitter.core.Event;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.math.expr.ExprMacroTable;
|
||||
import org.apache.druid.query.aggregation.AggregatorFactory;
|
||||
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
|
@ -90,6 +92,7 @@ import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
|
|||
import org.apache.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory;
|
||||
import org.apache.druid.segment.transform.ExpressionTransform;
|
||||
import org.apache.druid.segment.transform.TransformSpec;
|
||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.apache.druid.timeline.DataSegment;
|
||||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
|
@ -124,6 +127,8 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
|
@ -1086,6 +1091,7 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
|
||||
|
||||
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once();
|
||||
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();
|
||||
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
|
||||
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once();
|
||||
mockNotifier.start();
|
||||
|
@ -1150,6 +1156,8 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
|
||||
.andReturn(new NoopSegmentHandoffNotifierFactory())
|
||||
.once();
|
||||
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();
|
||||
|
||||
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
|
||||
|
||||
EasyMock.replay(mockToolbox);
|
||||
|
@ -1160,6 +1168,67 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
EasyMock.verify(mockDataSegment1, mockDataSegment2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWaitForSegmentAvailabilityEmitsExpectedMetric() throws IOException, InterruptedException
|
||||
{
|
||||
final File tmpDir = temporaryFolder.newFolder();
|
||||
|
||||
LatchableServiceEmitter latchEmitter = new LatchableServiceEmitter();
|
||||
latchEmitter.latch = new CountDownLatch(1);
|
||||
|
||||
TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class);
|
||||
|
||||
DataSegment mockDataSegment1 = EasyMock.createMock(DataSegment.class);
|
||||
DataSegment mockDataSegment2 = EasyMock.createMock(DataSegment.class);
|
||||
List<DataSegment> segmentsToWaitFor = new ArrayList<>();
|
||||
segmentsToWaitFor.add(mockDataSegment1);
|
||||
segmentsToWaitFor.add(mockDataSegment2);
|
||||
|
||||
IndexTask indexTask = new IndexTask(
|
||||
null,
|
||||
null,
|
||||
createDefaultIngestionSpec(
|
||||
jsonMapper,
|
||||
tmpDir,
|
||||
new UniformGranularitySpec(
|
||||
Granularities.HOUR,
|
||||
Granularities.MINUTE,
|
||||
null
|
||||
),
|
||||
null,
|
||||
createTuningConfigWithMaxRowsPerSegment(2, true),
|
||||
false,
|
||||
false
|
||||
),
|
||||
null
|
||||
);
|
||||
|
||||
EasyMock.expect(mockDataSegment1.getInterval()).andReturn(Intervals.of("1970-01-01/1971-01-01")).once();
|
||||
EasyMock.expect(mockDataSegment1.getVersion()).andReturn("dummyString").once();
|
||||
EasyMock.expect(mockDataSegment1.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
|
||||
EasyMock.expect(mockDataSegment1.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
|
||||
EasyMock.expect(mockDataSegment2.getInterval()).andReturn(Intervals.of("1971-01-01/1972-01-01")).once();
|
||||
EasyMock.expect(mockDataSegment2.getVersion()).andReturn("dummyString").once();
|
||||
EasyMock.expect(mockDataSegment2.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once();
|
||||
EasyMock.expect(mockDataSegment2.getId()).andReturn(SegmentId.dummy("MockDataSource")).once();
|
||||
|
||||
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory())
|
||||
.andReturn(new NoopSegmentHandoffNotifierFactory())
|
||||
.once();
|
||||
EasyMock.expect(mockToolbox.getEmitter())
|
||||
.andReturn(latchEmitter).anyTimes();
|
||||
|
||||
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
|
||||
|
||||
EasyMock.replay(mockToolbox);
|
||||
EasyMock.replay(mockDataSegment1, mockDataSegment2);
|
||||
|
||||
Assert.assertTrue(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 30000));
|
||||
latchEmitter.latch.await(300000, TimeUnit.MILLISECONDS);
|
||||
EasyMock.verify(mockToolbox);
|
||||
EasyMock.verify(mockDataSegment1, mockDataSegment2);
|
||||
}
|
||||
|
||||
private static void populateRollupTestData(File tmpFile) throws IOException
|
||||
{
|
||||
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
|
||||
|
@ -2709,6 +2778,27 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to test that expected metric is emitted by AbstractBatchIndexTask#waitForSegmentAvailability
|
||||
*/
|
||||
private static class LatchableServiceEmitter extends ServiceEmitter
|
||||
{
|
||||
private CountDownLatch latch;
|
||||
|
||||
private LatchableServiceEmitter()
|
||||
{
|
||||
super("", "", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emit(Event event)
|
||||
{
|
||||
if (latch != null && "task/segmentAvailability/wait/time".equals(event.toMap().get("metric"))) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEqualsAndHashCode()
|
||||
{
|
||||
|
|
|
@ -1360,6 +1360,7 @@ numMetrics
|
|||
poolKind
|
||||
poolName
|
||||
remoteAddress
|
||||
segmentAvailabilityConfirmed
|
||||
serviceName
|
||||
taskStatus
|
||||
taskType
|
||||
|
|
Loading…
Reference in New Issue