diff --git a/common/pom.xml b/common/pom.xml index 0f07a4105ab..326d490d5da 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -152,6 +152,11 @@ org.slf4j jcl-over-slf4j + + io.dropwizard.metrics + metrics-core + + com.lmax diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 734c8321aac..b156d3a6a28 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -50,7 +50,6 @@ import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.LookupNodeService; import io.druid.indexer.IngestionState; -import io.druid.indexer.TaskMetricsGetter; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -63,6 +62,8 @@ import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import io.druid.indexing.common.actions.ResetDataSourceMetadataAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMeters; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.AbstractTask; import io.druid.indexing.common.task.IndexTaskUtils; import io.druid.indexing.common.task.RealtimeIndexTask; @@ -86,7 +87,6 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; @@ -244,6 +244,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler private final CountDownLatch waitForPublishes = new CountDownLatch(1); private final AtomicReference throwableAtomicReference = new AtomicReference<>(); private final String topic; + private final RowIngestionMeters rowIngestionMeters; private volatile CopyOnWriteArrayList sequences; private ListeningExecutorService publishExecService; @@ -251,7 +252,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler private CircularBuffer savedParseExceptions; private IngestionState ingestionState; - private TaskMetricsGetter metricsGetter; private String errorMsg; @JsonCreator @@ -263,7 +263,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler @JsonProperty("ioConfig") KafkaIOConfig ioConfig, @JsonProperty("context") Map context, @JacksonInject ChatHandlerProvider chatHandlerProvider, - @JacksonInject AuthorizerMapper authorizerMapper + @JacksonInject AuthorizerMapper authorizerMapper, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory ) { super( @@ -284,6 +285,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler this.topic = ioConfig.getStartPartitions().getTopic(); this.sequences = new CopyOnWriteArrayList<>(); this.ingestionState = IngestionState.NOT_STARTED; + this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters(); if (context != null && context.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null && ((boolean) context.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) { @@ -511,8 +513,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler null ); fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); - metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(fireDepartmentMetrics); - toolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics)); + toolbox.getMonitorScheduler() + .addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics, rowIngestionMeters)); LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ? toolbox.getLookupNodeService() : @@ -758,10 +760,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler if (addResult.getParseException() != null) { handleParseException(addResult.getParseException(), record); } else { - fireDepartmentMetrics.incrementProcessed(); + rowIngestionMeters.incrementProcessed(); } } else { - fireDepartmentMetrics.incrementThrownAway(); + rowIngestionMeters.incrementThrownAway(); } } if (isPersistRequired) { @@ -950,8 +952,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler null ); fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); - metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(fireDepartmentMetrics); - toolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics)); + toolbox.getMonitorScheduler() + .addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics, rowIngestionMeters)); LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ? toolbox.getLookupNodeService() : @@ -1148,10 +1150,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler if (addResult.getParseException() != null) { handleParseException(addResult.getParseException(), record); } else { - fireDepartmentMetrics.incrementProcessed(); + rowIngestionMeters.incrementProcessed(); } } else { - fireDepartmentMetrics.incrementThrownAway(); + rowIngestionMeters.incrementThrownAway(); } } @@ -1296,9 +1298,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler private void handleParseException(ParseException pe, ConsumerRecord record) { if (pe.isFromPartiallyValidRow()) { - fireDepartmentMetrics.incrementProcessedWithErrors(); + rowIngestionMeters.incrementProcessedWithError(); } else { - fireDepartmentMetrics.incrementUnparseable(); + rowIngestionMeters.incrementUnparseable(); } if (tuningConfig.isLogParseExceptions()) { @@ -1314,7 +1316,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler savedParseExceptions.add(pe); } - if (fireDepartmentMetrics.unparseable() + fireDepartmentMetrics.processedWithErrors() + if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError() > tuningConfig.getMaxParseExceptions()) { log.error("Max parse exceptions exceeded, terminating task..."); throw new RuntimeException("Max parse exceptions exceeded, terminating task..."); @@ -1341,7 +1343,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler Map unparseableEventsMap = Maps.newHashMap(); List buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions); if (buildSegmentsParseExceptionMessages != null) { - unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages); + unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages); } return unparseableEventsMap; } @@ -1349,12 +1351,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler private Map getTaskCompletionRowStats() { Map metrics = Maps.newHashMap(); - if (metricsGetter != null) { - metrics.put( - "buildSegments", - metricsGetter.getTotalMetrics() - ); - } + metrics.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getTotals() + ); return metrics; } @@ -1574,14 +1574,18 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler authorizationCheck(req, Action.READ); Map returnMap = Maps.newHashMap(); Map totalsMap = Maps.newHashMap(); + Map averagesMap = Maps.newHashMap(); - if (metricsGetter != null) { - totalsMap.put( - "buildSegments", - metricsGetter.getTotalMetrics() - ); - } + totalsMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getTotals() + ); + averagesMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getMovingAverages() + ); + returnMap.put("movingAverages", averagesMap); returnMap.put("totals", totalsMap); return Response.ok(returnMap).build(); } @@ -1885,6 +1889,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler return fireDepartmentMetrics; } + @VisibleForTesting + RowIngestionMeters getRowIngestionMeters() + { + return rowIngestionMeters; + } + private boolean isPaused() { return status == Status.PAUSED; diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java index 315cc0fda93..544aa53c8bd 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -39,6 +38,7 @@ import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.common.jackson.JacksonUtils; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.http.client.HttpClient; import io.druid.java.util.http.client.Request; @@ -57,6 +57,7 @@ import java.io.IOException; import java.net.Socket; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -215,8 +216,10 @@ public class KafkaIndexTaskClient return ImmutableMap.of(); } catch (IOException | InterruptedException e) { - log.error("Exception [%s] while pausing Task [%s]", e.getMessage(), id); - throw Throwables.propagate(e); + throw new RuntimeException( + StringUtils.format("Exception [%s] while pausing Task [%s]", e.getMessage(), id), + e + ); } } @@ -232,7 +235,7 @@ public class KafkaIndexTaskClient return KafkaIndexTask.Status.NOT_STARTED; } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -250,10 +253,48 @@ public class KafkaIndexTaskClient return null; } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } + public Map getMovingAverages(final String id) + { + log.debug("GetMovingAverages task[%s]", id); + + try { + final FullResponseHolder response = submitRequest( + id, + HttpMethod.GET, + "rowStats", + null, + true + ); + return response.getContent() == null || response.getContent().isEmpty() + ? Collections.emptyMap() + : jsonMapper.readValue(response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT); + } + catch (NoTaskLocationException e) { + return Collections.emptyMap(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + public ListenableFuture> getMovingAveragesAsync(final String id) + { + return executorService.submit( + new Callable>() + { + @Override + public Map call() + { + return getMovingAverages(id); + } + } + ); + } + public Map getCurrentOffsets(final String id, final boolean retry) { log.debug("GetCurrentOffsets task[%s] retry[%s]", id, retry); @@ -268,7 +309,7 @@ public class KafkaIndexTaskClient return ImmutableMap.of(); } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -285,7 +326,7 @@ public class KafkaIndexTaskClient return EMPTY_TREE_MAP; } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -313,7 +354,7 @@ public class KafkaIndexTaskClient return ImmutableMap.of(); } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -341,7 +382,7 @@ public class KafkaIndexTaskClient return false; } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -552,11 +593,17 @@ public class KafkaIndexTaskClient log.debug("HTTP %s: %s", method.getName(), serviceUri.toString()); response = httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get(); + + } + catch (IOException | ChannelException ioce) { + throw ioce; + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); } catch (Exception e) { - Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); - Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class); - throw Throwables.propagate(e); + throw new RuntimeException(e); } int responseCode = response.getStatus().getCode(); @@ -605,10 +652,10 @@ public class KafkaIndexTaskClient // if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was // for informational purposes only) so don't log a scary stack trace log.info("submitRequest failed for [%s], with message [%s]", urlForLog, e.getMessage()); - Throwables.propagate(e); + throw new RuntimeException(e); } else if (delay == null) { log.warn(e, "Retries exhausted for [%s], last exception:", urlForLog); - Throwables.propagate(e); + throw new RuntimeException(e); } else { try { final long sleepTime = delay.getMillis(); @@ -622,7 +669,9 @@ public class KafkaIndexTaskClient Thread.sleep(sleepTime); } catch (InterruptedException e2) { - Throwables.propagate(e2); + Thread.currentThread().interrupt(); + e.addSuppressed(e2); + throw new RuntimeException(e); } } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 13388e8de08..f592724b42b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -45,6 +45,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.druid.indexer.TaskLocation; import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.kafka.KafkaDataSourceMetadata; @@ -228,6 +229,7 @@ public class KafkaSupervisor implements Supervisor private final String supervisorId; private final TaskInfoProvider taskInfoProvider; private final long futureTimeoutInSeconds; // how long to wait for async operations to complete + private final RowIngestionMetersFactory rowIngestionMetersFactory; private final ExecutorService exec; private final ScheduledExecutorService scheduledExec; @@ -254,7 +256,8 @@ public class KafkaSupervisor implements Supervisor final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, final KafkaIndexTaskClientFactory taskClientFactory, final ObjectMapper mapper, - final KafkaSupervisorSpec spec + final KafkaSupervisorSpec spec, + final RowIngestionMetersFactory rowIngestionMetersFactory ) { this.taskStorage = taskStorage; @@ -264,6 +267,7 @@ public class KafkaSupervisor implements Supervisor this.spec = spec; this.emitter = spec.getEmitter(); this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig(); + this.rowIngestionMetersFactory = rowIngestionMetersFactory; this.dataSource = spec.getDataSchema().getDataSource(); this.ioConfig = spec.getIoConfig(); @@ -484,6 +488,22 @@ public class KafkaSupervisor implements Supervisor return generateReport(true); } + @Override + public Map> getStats() + { + try { + return getCurrentTotalStats(); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error(ie, "getStats() interrupted."); + throw new RuntimeException(ie); + } + catch (ExecutionException | TimeoutException eete) { + throw new RuntimeException(eete); + } + } + @Override public void reset(DataSourceMetadata dataSourceMetadata) { @@ -1787,7 +1807,8 @@ public class KafkaSupervisor implements Supervisor kafkaIOConfig, context, null, - null + null, + rowIngestionMetersFactory ); Optional taskQueue = taskMaster.getTaskQueue(); @@ -2198,4 +2219,106 @@ public class KafkaSupervisor implements Supervisor } }; } + + /** + * Collect row ingestion stats from all tasks managed by this supervisor. + * + * @return A map of groupId->taskId->task row stats + * + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + private Map> getCurrentTotalStats() throws InterruptedException, ExecutionException, TimeoutException + { + Map> allStats = Maps.newHashMap(); + final List> futures = new ArrayList<>(); + final List> groupAndTaskIds = new ArrayList<>(); + + for (int groupId : taskGroups.keySet()) { + TaskGroup group = taskGroups.get(groupId); + for (String taskId : group.taskIds()) { + futures.add( + Futures.transform( + taskClient.getMovingAveragesAsync(taskId), + (Function, StatsFromTaskResult>) (currentStats) -> { + return new StatsFromTaskResult( + groupId, + taskId, + currentStats + ); + } + ) + ); + groupAndTaskIds.add(new Pair<>(groupId, taskId)); + } + } + + for (int groupId : pendingCompletionTaskGroups.keySet()) { + TaskGroup group = taskGroups.get(groupId); + for (String taskId : group.taskIds()) { + futures.add( + Futures.transform( + taskClient.getMovingAveragesAsync(taskId), + (Function, StatsFromTaskResult>) (currentStats) -> { + return new StatsFromTaskResult( + groupId, + taskId, + currentStats + ); + } + ) + ); + groupAndTaskIds.add(new Pair<>(groupId, taskId)); + } + } + + List results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); + for (int i = 0; i < results.size(); i++) { + StatsFromTaskResult result = results.get(i); + if (result != null) { + Map groupMap = allStats.computeIfAbsent(result.getGroupId(), k -> Maps.newHashMap()); + groupMap.put(result.getTaskId(), result.getStats()); + } else { + Pair groupAndTaskId = groupAndTaskIds.get(i); + log.error("Failed to get stats for group[%d]-task[%s]", groupAndTaskId.lhs, groupAndTaskId.rhs); + } + } + + return allStats; + } + + private static class StatsFromTaskResult + { + private final String groupId; + private final String taskId; + private final Map stats; + + public StatsFromTaskResult( + int groupId, + String taskId, + Map stats + ) + { + this.groupId = String.valueOf(groupId); + this.taskId = taskId; + this.stats = stats; + } + + public String getGroupId() + { + return groupId; + } + + public String getTaskId() + { + return taskId; + } + + public Map getStats() + { + return stats; + } + } + } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 53678810cdc..b364007aefc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import io.druid.guice.annotations.Json; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.kafka.KafkaIndexTaskClientFactory; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskMaster; @@ -53,6 +54,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec private final ObjectMapper mapper; private final ServiceEmitter emitter; private final DruidMonitorSchedulerConfig monitorSchedulerConfig; + private final RowIngestionMetersFactory rowIngestionMetersFactory; @JsonCreator public KafkaSupervisorSpec( @@ -66,7 +68,8 @@ public class KafkaSupervisorSpec implements SupervisorSpec @JacksonInject KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory, @JacksonInject @Json ObjectMapper mapper, @JacksonInject ServiceEmitter emitter, - @JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig + @JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory ) { this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); @@ -106,6 +109,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec this.mapper = mapper; this.emitter = emitter; this.monitorSchedulerConfig = monitorSchedulerConfig; + this.rowIngestionMetersFactory = rowIngestionMetersFactory; } @JsonProperty @@ -157,7 +161,8 @@ public class KafkaSupervisorSpec implements SupervisorSpec indexerMetadataStorageCoordinator, kafkaIndexTaskClientFactory, mapper, - this + this, + rowIngestionMetersFactory ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 401c98d8f18..b4561a53993 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -50,7 +50,9 @@ import org.joda.time.Duration; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -67,6 +69,9 @@ import static org.easymock.EasyMock.reset; @RunWith(Parameterized.class) public class KafkaIndexTaskClientTest extends EasyMockSupport { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private static final ObjectMapper objectMapper = new DefaultObjectMapper(); private static final String TEST_ID = "test-id"; private static final List TEST_IDS = Lists.newArrayList("test-id1", "test-id2", "test-id3", "test-id4"); @@ -149,9 +154,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport verifyAll(); } - @Test(expected = KafkaIndexTaskClient.TaskNotRunnableException.class) + @Test public void testTaskNotRunnableException() { + expectedException.expect(KafkaIndexTaskClient.TaskNotRunnableException.class); + expectedException.expectMessage("Aborting request because task [test-id] is not runnable"); + reset(taskInfoProvider); expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT)) .anyTimes(); @@ -162,9 +170,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport verifyAll(); } - @Test(expected = RuntimeException.class) + @Test public void testInternalServerError() { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("io.druid.java.util.common.IOE: Received status [500]"); + expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2); expect( httpClient.go( @@ -181,9 +192,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport verifyAll(); } - @Test(expected = IAE.class) + @Test public void testBadRequest() { + expectedException.expect(IAE.class); + expectedException.expectMessage("Received 400 Bad Request with body:"); + expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2); expect(responseHolder.getContent()).andReturn(""); expect( @@ -293,9 +307,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport Assert.assertEquals(10, (long) results.get(1)); } - @Test(expected = RuntimeException.class) + @Test public void testGetCurrentOffsetsWithExhaustedRetries() { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("io.druid.java.util.common.IOE: Received status [404]"); + client = new TestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider, 2); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).anyTimes(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index e3b15b0c195..baa53af71d6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -39,10 +39,11 @@ import com.google.common.util.concurrent.MoreExecutors; import io.druid.data.input.impl.FloatDimensionSchema; import io.druid.data.input.impl.LongDimensionSchema; import io.druid.data.input.impl.StringDimensionSchema; -import io.druid.indexer.TaskMetricsUtils; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.TaskReport; import io.druid.indexing.common.TaskReportFileWriter; +import io.druid.indexing.common.stats.RowIngestionMeters; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.IndexTaskTest; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; @@ -210,6 +211,7 @@ public class KafkaIndexTaskTest private final boolean isIncrementalHandoffSupported; private final Set checkpointRequestsHash = Sets.newHashSet(); private File reportsFile; + private RowIngestionMetersFactory rowIngestionMetersFactory; // This should be removed in versions greater that 0.12.x // isIncrementalHandoffSupported should always be set to true in those later versions @@ -393,9 +395,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -447,9 +449,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -525,9 +527,9 @@ public class KafkaIndexTaskTest )); // Check metrics - Assert.assertEquals(8, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(3, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(8, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); @@ -615,9 +617,9 @@ public class KafkaIndexTaskTest )); // Check metrics - Assert.assertEquals(2, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); @@ -669,9 +671,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(2, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -723,9 +725,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(2, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); @@ -787,9 +789,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(1, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(4, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(1, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(4, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); @@ -835,9 +837,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(0, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(0, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata Assert.assertEquals(ImmutableSet.of(), publishedDescriptors()); @@ -876,9 +878,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -931,9 +933,9 @@ public class KafkaIndexTaskTest ); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -986,9 +988,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.FAILED, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(1, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata Assert.assertEquals(ImmutableSet.of(), publishedDescriptors()); @@ -1033,10 +1035,10 @@ public class KafkaIndexTaskTest Assert.assertEquals(null, status.getErrorMsg()); // Check metrics - Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(3, task.getFireDepartmentMetrics().processedWithErrors()); - Assert.assertEquals(3, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(4, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessedWithError()); + Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -1052,18 +1054,18 @@ public class KafkaIndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedMetrics = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED, 4, - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 3, - TaskMetricsUtils.ROWS_UNPARSEABLE, 3, - TaskMetricsUtils.ROWS_THROWN_AWAY, 1 + RowIngestionMeters.PROCESSED, 4, + RowIngestionMeters.PROCESSED_WITH_ERROR, 3, + RowIngestionMeters.UNPARSEABLE, 3, + RowIngestionMeters.THROWN_AWAY, 1 ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Map unparseableEvents = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=20.0, met1=notanumber}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [Unable to parse value[notanumber] for field[met1],]", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float,]", @@ -1115,10 +1117,10 @@ public class KafkaIndexTaskTest IndexTaskTest.checkTaskStatusErrorMsgForParseExceptionsExceeded(status); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().processedWithErrors()); - Assert.assertEquals(3, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getProcessedWithError()); + Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata Assert.assertEquals(ImmutableSet.of(), publishedDescriptors()); @@ -1127,18 +1129,18 @@ public class KafkaIndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedMetrics = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED, 3, - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0, - TaskMetricsUtils.ROWS_UNPARSEABLE, 3, - TaskMetricsUtils.ROWS_THROWN_AWAY, 0 + RowIngestionMeters.PROCESSED, 3, + RowIngestionMeters.PROCESSED_WITH_ERROR, 0, + RowIngestionMeters.UNPARSEABLE, 3, + RowIngestionMeters.THROWN_AWAY, 0 ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Map unparseableEvents = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( "Unable to parse row [unparseable2]", "Unable to parse row [unparseable]" @@ -1195,12 +1197,12 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); - Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway()); // Check published segments & metadata SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); @@ -1264,12 +1266,12 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.FAILED, future2.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); - Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(3, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task2.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task2.getRowIngestionMeters().getThrownAway()); // Check published segments & metadata, should all be from the first task SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); @@ -1339,12 +1341,12 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); - Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(3, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task2.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task2.getRowIngestionMeters().getThrownAway()); // Check published segments & metadata SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1); @@ -1391,9 +1393,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(5, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(5, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published segments & metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -1471,12 +1473,12 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(1, task2.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway()); // Check published segments & metadata SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); @@ -1562,12 +1564,12 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); // Check metrics - Assert.assertEquals(2, task1.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task1.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(1, task2.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway()); // Check published segments & metadata SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); @@ -1649,9 +1651,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -1734,9 +1736,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(2, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(4, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(2, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); @@ -1872,9 +1874,9 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); @@ -1992,7 +1994,8 @@ public class KafkaIndexTaskTest ioConfig, context, null, - null + null, + rowIngestionMetersFactory ); task.setPollRetryMs(POLL_RETRY_MS); return task; @@ -2036,7 +2039,8 @@ public class KafkaIndexTaskTest ioConfig, context, null, - null + null, + rowIngestionMetersFactory ); task.setPollRetryMs(POLL_RETRY_MS); return task; @@ -2093,6 +2097,7 @@ public class KafkaIndexTaskTest { directory = tempFolder.newFolder(); final TestUtils testUtils = new TestUtils(); + rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); for (Module module : new KafkaIndexTaskModule().getJacksonModules()) { objectMapper.registerModule(module); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index cbea2344b22..b60393ee955 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -34,6 +34,8 @@ import io.druid.data.input.impl.TimestampSpec; import io.druid.indexer.TaskLocation; import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; import io.druid.indexing.kafka.KafkaDataSourceMetadata; @@ -138,6 +140,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private KafkaIndexTaskClient taskClient; private TaskQueue taskQueue; private String topic; + private RowIngestionMetersFactory rowIngestionMetersFactory; private static String getTopic() { @@ -209,6 +212,7 @@ public class KafkaSupervisorTest extends EasyMockSupport ); topic = getTopic(); + rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); } @After @@ -1982,8 +1986,10 @@ public class KafkaSupervisorTest extends EasyMockSupport taskClientFactory, objectMapper, new NoopServiceEmitter(), - new DruidMonitorSchedulerConfig() - ) + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory + ), + rowIngestionMetersFactory ); } @@ -2050,7 +2056,8 @@ public class KafkaSupervisorTest extends EasyMockSupport ), ImmutableMap.of(), null, - null + null, + rowIngestionMetersFactory ); } @@ -2096,10 +2103,19 @@ public class KafkaSupervisorTest extends EasyMockSupport IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KafkaIndexTaskClientFactory taskClientFactory, ObjectMapper mapper, - KafkaSupervisorSpec spec + KafkaSupervisorSpec spec, + RowIngestionMetersFactory rowIngestionMetersFactory ) { - super(taskStorage, taskMaster, indexerMetadataStorageCoordinator, taskClientFactory, mapper, spec); + super( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + mapper, + spec, + rowIngestionMetersFactory + ); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java index 05cf88b51a2..732bda3836b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskRealtimeMetricsMonitorBuilder.java @@ -21,6 +21,8 @@ package io.druid.indexing.common; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.druid.indexing.common.stats.RowIngestionMeters; +import io.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import io.druid.indexing.common.task.Task; import io.druid.query.DruidMetrics; import io.druid.segment.realtime.FireDepartment; @@ -40,4 +42,16 @@ public class TaskRealtimeMetricsMonitorBuilder ) ); } + + public static TaskRealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment, RowIngestionMeters meters) + { + return new TaskRealtimeMetricsMonitor( + fireDepartment, + meters, + ImmutableMap.of( + DruidMetrics.TASK_ID, new String[]{task.getId()}, + DruidMetrics.TASK_TYPE, new String[]{task.getType()} + ) + ); + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 800c0300d43..28072be3f7e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -52,8 +52,6 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return Files.createTempDir(); } - - private final int maxRowsInMemory; private final int maxRowsPerSegment; private final long maxBytesInMemory; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/stats/DropwizardRowIngestionMeters.java b/indexing-service/src/main/java/io/druid/indexing/common/stats/DropwizardRowIngestionMeters.java new file mode 100644 index 00000000000..93653464bfe --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/stats/DropwizardRowIngestionMeters.java @@ -0,0 +1,137 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.stats; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Maps; + +import java.util.Map; + +public class DropwizardRowIngestionMeters implements RowIngestionMeters +{ + public static final String ONE_MINUTE_NAME = "1m"; + public static final String FIVE_MINUTE_NAME = "5m"; + public static final String FIFTEEN_MINUTE_NAME = "15m"; + + private final MetricRegistry metricRegistry; + private final Meter processed; + private final Meter processedWithError; + private final Meter unparseable; + private final Meter thrownAway; + + public DropwizardRowIngestionMeters() + { + this.metricRegistry = new MetricRegistry(); + this.processed = metricRegistry.meter(PROCESSED); + this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR); + this.unparseable = metricRegistry.meter(UNPARSEABLE); + this.thrownAway = metricRegistry.meter(THROWN_AWAY); + } + + @Override + public long getProcessed() + { + return processed.getCount(); + } + + @Override + public void incrementProcessed() + { + processed.mark(); + } + + @Override + public long getProcessedWithError() + { + return processedWithError.getCount(); + } + + @Override + public void incrementProcessedWithError() + { + processedWithError.mark(); + } + + @Override + public long getUnparseable() + { + return unparseable.getCount(); + } + + @Override + public void incrementUnparseable() + { + unparseable.mark(); + } + + @Override + public long getThrownAway() + { + return thrownAway.getCount(); + } + + @Override + public void incrementThrownAway() + { + thrownAway.mark(); + } + + @Override + public RowIngestionMetersTotals getTotals() + { + return new RowIngestionMetersTotals( + processed.getCount(), + processedWithError.getCount(), + thrownAway.getCount(), + unparseable.getCount() + ); + } + + @Override + public Map getMovingAverages() + { + Map movingAverages = Maps.newHashMap(); + + Map oneMinute = Maps.newHashMap(); + oneMinute.put(PROCESSED, processed.getOneMinuteRate()); + oneMinute.put(PROCESSED_WITH_ERROR, processedWithError.getOneMinuteRate()); + oneMinute.put(UNPARSEABLE, unparseable.getOneMinuteRate()); + oneMinute.put(THROWN_AWAY, thrownAway.getOneMinuteRate()); + + Map fiveMinute = Maps.newHashMap(); + fiveMinute.put(PROCESSED, processed.getFiveMinuteRate()); + fiveMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFiveMinuteRate()); + fiveMinute.put(UNPARSEABLE, unparseable.getFiveMinuteRate()); + fiveMinute.put(THROWN_AWAY, thrownAway.getFiveMinuteRate()); + + Map fifteenMinute = Maps.newHashMap(); + fifteenMinute.put(PROCESSED, processed.getFifteenMinuteRate()); + fifteenMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFifteenMinuteRate()); + fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate()); + fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate()); + + movingAverages.put(ONE_MINUTE_NAME, oneMinute); + movingAverages.put(FIVE_MINUTE_NAME, fiveMinute); + movingAverages.put(FIFTEEN_MINUTE_NAME, fifteenMinute); + + return movingAverages; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java new file mode 100644 index 00000000000..3788fbaf71d --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/stats/DropwizardRowIngestionMetersFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.stats; + +public class DropwizardRowIngestionMetersFactory implements RowIngestionMetersFactory +{ + @Override + public RowIngestionMeters createRowIngestionMeters() + { + return new DropwizardRowIngestionMeters(); + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMeters.java b/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMeters.java new file mode 100644 index 00000000000..7e85eb2f86a --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMeters.java @@ -0,0 +1,58 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.stats; + +import io.druid.guice.annotations.ExtensionPoint; + +import java.util.Map; + +/** + * A collection of meters for row ingestion stats, with support for moving average calculations. + * This can eventually replace FireDepartmentMetrics, but moving averages for other stats collected by + * FireDepartmentMetrics are not currently supported, so we continue to use FireDepartmentMetrics alongside + * RowIngestionMeters to avoid unnecessary overhead from maintaining these moving averages. + */ +@ExtensionPoint +public interface RowIngestionMeters +{ + String BUILD_SEGMENTS = "buildSegments"; + String DETERMINE_PARTITIONS = "determinePartitions"; + + String PROCESSED = "processed"; + String PROCESSED_WITH_ERROR = "processedWithError"; + String UNPARSEABLE = "unparseable"; + String THROWN_AWAY = "thrownAway"; + + long getProcessed(); + void incrementProcessed(); + + long getProcessedWithError(); + void incrementProcessedWithError(); + + long getUnparseable(); + void incrementUnparseable(); + + long getThrownAway(); + void incrementThrownAway(); + + RowIngestionMetersTotals getTotals(); + + Map getMovingAverages(); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMetersFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMetersFactory.java new file mode 100644 index 00000000000..3598f1f164a --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMetersFactory.java @@ -0,0 +1,25 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.stats; + +public interface RowIngestionMetersFactory +{ + RowIngestionMeters createRowIngestionMeters(); +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMetersTotals.java b/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMetersTotals.java new file mode 100644 index 00000000000..bb7a1dbceaa --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/stats/RowIngestionMetersTotals.java @@ -0,0 +1,69 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.stats; + +import com.amazonaws.thirdparty.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class RowIngestionMetersTotals +{ + private final long processed; + private final long processedWithError; + private final long thrownAway; + private final long unparseable; + + @JsonCreator + public RowIngestionMetersTotals( + @JsonProperty("processed") long processed, + @JsonProperty("processedWithError") long processedWithError, + @JsonProperty("thrownAway") long thrownAway, + @JsonProperty("unparseable") long unparseable + ) + { + this.processed = processed; + this.processedWithError = processedWithError; + this.thrownAway = thrownAway; + this.unparseable = unparseable; + } + + @JsonProperty + public long getProcessed() + { + return processed; + } + + @JsonProperty + public long getProcessedWithError() + { + return processedWithError; + } + + @JsonProperty + public long getThrownAway() + { + return thrownAway; + } + + @JsonProperty + public long getUnparseable() + { + return unparseable; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java b/indexing-service/src/main/java/io/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java new file mode 100644 index 00000000000..48907fea694 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java @@ -0,0 +1,122 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.stats; + +import com.google.common.collect.ImmutableMap; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.emitter.service.ServiceMetricEvent; +import io.druid.java.util.metrics.AbstractMonitor; +import io.druid.java.util.metrics.MonitorUtils; +import io.druid.query.DruidMetrics; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; + +import java.util.Map; + +/** + * Replaces the old RealtimeMetricsMonitor for indexing tasks that use a single FireDepartment, with changes to + * read row ingestion stats from RowIngestionMeters (which supports moving averages) instead of FireDepartmentMetrics. + * See comment on RowIngestionMeters for more information regarding relationship between RowIngestionMeters and + * FireDepartmentMetrics. + */ +public class TaskRealtimeMetricsMonitor extends AbstractMonitor +{ + private static final EmittingLogger log = new EmittingLogger(TaskRealtimeMetricsMonitor.class); + + private final FireDepartment fireDepartment; + private final RowIngestionMeters rowIngestionMeters; + private final Map dimensions; + + private FireDepartmentMetrics previousFireDepartmentMetrics; + private RowIngestionMetersTotals previousRowIngestionMetersTotals; + + public TaskRealtimeMetricsMonitor( + FireDepartment fireDepartment, + RowIngestionMeters rowIngestionMeters, + Map dimensions + ) + { + this.fireDepartment = fireDepartment; + this.rowIngestionMeters = rowIngestionMeters; + this.dimensions = ImmutableMap.copyOf(dimensions); + previousFireDepartmentMetrics = new FireDepartmentMetrics(); + previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0); + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + FireDepartmentMetrics metrics = fireDepartment.getMetrics().snapshot(); + RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals(); + + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource()); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + + final long thrownAway = rowIngestionMetersTotals.getThrownAway() - previousRowIngestionMetersTotals.getThrownAway(); + if (thrownAway > 0) { + log.warn("[%,d] events thrown away because they are outside the window period!", thrownAway); + } + emitter.emit(builder.build("ingest/events/thrownAway", thrownAway)); + + final long unparseable = rowIngestionMetersTotals.getUnparseable() - previousRowIngestionMetersTotals.getUnparseable(); + if (unparseable > 0) { + log.error("[%,d] Unparseable events! Turn on debug logging to see exception stack trace.", unparseable); + } + emitter.emit(builder.build("ingest/events/unparseable", unparseable)); + + final long processedWithError = rowIngestionMetersTotals.getProcessedWithError() - previousRowIngestionMetersTotals.getProcessedWithError(); + if (processedWithError > 0) { + log.error("[%,d] events processed with errors! Set logParseExceptions to true in the ingestion spec to log these errors.", processedWithError); + } + emitter.emit(builder.build("ingest/events/processedWithError", processedWithError)); + + emitter.emit(builder.build("ingest/events/processed", rowIngestionMetersTotals.getProcessed() - previousRowIngestionMetersTotals.getProcessed())); + + final long dedup = metrics.dedup() - previousFireDepartmentMetrics.dedup(); + if (dedup > 0) { + log.warn("[%,d] duplicate events!", dedup); + } + emitter.emit(builder.build("ingest/events/duplicate", dedup)); + + emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previousFireDepartmentMetrics.rowOutput())); + emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previousFireDepartmentMetrics.numPersists())); + emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previousFireDepartmentMetrics.persistTimeMillis())); + emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previousFireDepartmentMetrics.persistCpuTime())); + emitter.emit( + builder.build( + "ingest/persists/backPressure", + metrics.persistBackPressureMillis() - previousFireDepartmentMetrics.persistBackPressureMillis() + ) + ); + emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previousFireDepartmentMetrics.failedPersists())); + emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previousFireDepartmentMetrics.failedHandoffs())); + emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previousFireDepartmentMetrics.mergeTimeMillis())); + emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previousFireDepartmentMetrics.mergeCpuTime())); + emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() - previousFireDepartmentMetrics.handOffCount())); + emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount())); + emitter.emit(builder.build("ingest/events/messageGap", metrics.messageGap())); + + previousRowIngestionMetersTotals = rowIngestionMetersTotals; + previousFireDepartmentMetrics = metrics; + return true; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 799d1716cba..b78d60f493f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -38,7 +39,6 @@ import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.LookupNodeService; import io.druid.indexer.IngestionState; -import io.druid.indexer.TaskMetricsGetter; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -51,6 +51,9 @@ import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec; import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig; +import io.druid.indexing.common.stats.RowIngestionMeters; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; +import io.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; @@ -65,8 +68,6 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter; -import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; @@ -143,7 +144,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements private volatile FireDepartmentMetrics metrics = null; @JsonIgnore - private TaskMetricsGetter metricsGetter; + private final RowIngestionMeters rowIngestionMeters; @JsonIgnore private volatile boolean gracefullyStopped = false; @@ -176,7 +177,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements @JsonProperty("spec") RealtimeAppenderatorIngestionSpec spec, @JsonProperty("context") Map context, @JacksonInject ChatHandlerProvider chatHandlerProvider, - @JacksonInject AuthorizerMapper authorizerMapper + @JacksonInject AuthorizerMapper authorizerMapper, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory ) { super( @@ -196,6 +198,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements } this.ingestionState = IngestionState.NOT_STARTED; + this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters(); } @Override @@ -248,10 +251,13 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements dataSchema, new RealtimeIOConfig(null, null, null), null ); - final RealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics); + final TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build( + this, + fireDepartmentForMetrics, + rowIngestionMeters + ); this.metrics = fireDepartmentForMetrics.getMetrics(); - metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(metrics); Supplier committerSupplier = null; final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); @@ -269,6 +275,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements log.warn("No chat handler detected"); } + toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); @@ -309,7 +316,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements if (inputRow == null) { log.debug("Discarded null row, considering thrownAway."); - metrics.incrementThrownAway(); + rowIngestionMeters.incrementThrownAway(); } else { AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier); @@ -330,7 +337,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements if (addResult.getParseException() != null) { handleParseException(addResult.getParseException()); } else { - metrics.incrementProcessed(); + rowIngestionMeters.incrementProcessed(); } } } @@ -431,6 +438,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements * Public for tests. */ @JsonIgnore + @VisibleForTesting public Firehose getFirehose() { return firehose; @@ -440,11 +448,19 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements * Public for tests. */ @JsonIgnore + @VisibleForTesting public FireDepartmentMetrics getMetrics() { return metrics; } + @JsonIgnore + @VisibleForTesting + public RowIngestionMeters getRowIngestionMeters() + { + return rowIngestionMeters; + } + @JsonProperty("spec") public RealtimeAppenderatorIngestionSpec getSpec() { @@ -462,14 +478,18 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); Map returnMap = Maps.newHashMap(); Map totalsMap = Maps.newHashMap(); + Map averagesMap = Maps.newHashMap(); - if (metricsGetter != null) { - totalsMap.put( - "buildSegments", - metricsGetter.getTotalMetrics() - ); - } + totalsMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getTotals() + ); + averagesMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getMovingAverages() + ); + returnMap.put("movingAverages", averagesMap); returnMap.put("totals", totalsMap); return Response.ok(returnMap).build(); } @@ -523,7 +543,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements Map unparseableEventsMap = Maps.newHashMap(); List buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions); if (buildSegmentsParseExceptionMessages != null) { - unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages); + unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages); } return unparseableEventsMap; } @@ -531,21 +551,19 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements private Map getTaskCompletionRowStats() { Map metricsMap = Maps.newHashMap(); - if (metricsGetter != null) { - metricsMap.put( - "buildSegments", - metricsGetter.getTotalMetrics() - ); - } + metricsMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + rowIngestionMeters.getTotals() + ); return metricsMap; } private void handleParseException(ParseException pe) { if (pe.isFromPartiallyValidRow()) { - metrics.incrementProcessedWithErrors(); + rowIngestionMeters.incrementProcessedWithError(); } else { - metrics.incrementUnparseable(); + rowIngestionMeters.incrementUnparseable(); } if (spec.getTuningConfig().isLogParseExceptions()) { @@ -556,7 +574,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements savedParseExceptions.add(pe); } - if (metrics.unparseable() + metrics.processedWithErrors() + if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError() > spec.getTuningConfig().getMaxParseExceptions()) { log.error("Max parse exceptions exceeded, terminating task..."); throw new RuntimeException("Max parse exceptions exceeded, terminating task..."); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 85d2e333cf3..4ce297a642a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -44,6 +44,7 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.IndexTask.IndexIOConfig; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; @@ -66,6 +67,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.server.security.AuthorizerMapper; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; @@ -108,6 +110,12 @@ public class CompactionTask extends AbstractTask @JsonIgnore private final AuthorizerMapper authorizerMapper; + @JsonIgnore + private final ChatHandlerProvider chatHandlerProvider; + + @JsonIgnore + private final RowIngestionMetersFactory rowIngestionMetersFactory; + @JsonCreator public CompactionTask( @JsonProperty("id") final String id, @@ -119,7 +127,9 @@ public class CompactionTask extends AbstractTask @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig, @Nullable @JsonProperty("context") final Map context, @JacksonInject ObjectMapper jsonMapper, - @JacksonInject AuthorizerMapper authorizerMapper + @JacksonInject AuthorizerMapper authorizerMapper, + @JacksonInject ChatHandlerProvider chatHandlerProvider, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory ) { super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context); @@ -133,6 +143,8 @@ public class CompactionTask extends AbstractTask this.jsonMapper = jsonMapper; this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments); this.authorizerMapper = authorizerMapper; + this.chatHandlerProvider = chatHandlerProvider; + this.rowIngestionMetersFactory = rowIngestionMetersFactory; } @JsonProperty @@ -206,7 +218,8 @@ public class CompactionTask extends AbstractTask ingestionSpec, getContext(), authorizerMapper, - null + chatHandlerProvider, + rowIngestionMetersFactory ); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 984a9fd6a52..74d65106f4b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -49,6 +49,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMeters; import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.JodaUtils; @@ -424,11 +425,11 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler Map totalsMap = Maps.newHashMap(); if (determinePartitionsStatsGetter != null) { - totalsMap.put("determinePartitions", determinePartitionsStatsGetter.getTotalMetrics()); + totalsMap.put(RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsStatsGetter.getTotalMetrics()); } if (buildSegmentsStatsGetter != null) { - totalsMap.put("buildSegments", buildSegmentsStatsGetter.getTotalMetrics()); + totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsStatsGetter.getTotalMetrics()); } returnMap.put("totals", totalsMap); @@ -455,13 +456,13 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler Map metrics = Maps.newHashMap(); if (determineConfigStatus != null) { metrics.put( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, determineConfigStatus.getMetrics() ); } if (buildSegmentsStatus != null) { metrics.put( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsStatus.getMetrics() ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index ec9ab1b2608..8d7d4dd01d9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -41,7 +41,6 @@ import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import io.druid.hll.HyperLogLogCollector; import io.druid.indexer.IngestionState; -import io.druid.indexer.TaskMetricsGetter; import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport; @@ -53,6 +52,9 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMeters; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; +import io.druid.indexing.common.stats.TaskRealtimeMetricsMonitor; import io.druid.indexing.firehose.IngestSegmentFirehoseFactory; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Intervals; @@ -71,8 +73,6 @@ import io.druid.segment.indexing.TuningConfig; import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter; -import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; @@ -164,24 +164,21 @@ public class IndexTask extends AbstractTask implements ChatHandler @JsonIgnore private FireDepartmentMetrics buildSegmentsFireDepartmentMetrics; - @JsonIgnore - private TaskMetricsGetter buildSegmentsMetricsGetter; - @JsonIgnore private CircularBuffer buildSegmentsSavedParseExceptions; - @JsonIgnore - private FireDepartmentMetrics determinePartitionsFireDepartmentMetrics; - - @JsonIgnore - private TaskMetricsGetter determinePartitionsMetricsGetter; - @JsonIgnore private CircularBuffer determinePartitionsSavedParseExceptions; @JsonIgnore private String errorMsg; + @JsonIgnore + private final RowIngestionMeters determinePartitionsMeters; + + @JsonIgnore + private final RowIngestionMeters buildSegmentsMeters; + @JsonCreator public IndexTask( @JsonProperty("id") final String id, @@ -189,7 +186,8 @@ public class IndexTask extends AbstractTask implements ChatHandler @JsonProperty("spec") final IndexIngestionSpec ingestionSchema, @JsonProperty("context") final Map context, @JacksonInject AuthorizerMapper authorizerMapper, - @JacksonInject ChatHandlerProvider chatHandlerProvider + @JacksonInject ChatHandlerProvider chatHandlerProvider, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory ) { this( @@ -200,7 +198,8 @@ public class IndexTask extends AbstractTask implements ChatHandler ingestionSchema, context, authorizerMapper, - chatHandlerProvider + chatHandlerProvider, + rowIngestionMetersFactory ); } @@ -212,7 +211,8 @@ public class IndexTask extends AbstractTask implements ChatHandler IndexIngestionSpec ingestionSchema, Map context, AuthorizerMapper authorizerMapper, - ChatHandlerProvider chatHandlerProvider + ChatHandlerProvider chatHandlerProvider, + RowIngestionMetersFactory rowIngestionMetersFactory ) { super( @@ -235,6 +235,8 @@ public class IndexTask extends AbstractTask implements ChatHandler ); } this.ingestionState = IngestionState.NOT_STARTED; + this.determinePartitionsMeters = rowIngestionMetersFactory.createRowIngestionMeters(); + this.buildSegmentsMeters = rowIngestionMetersFactory.createRowIngestionMeters(); } @Override @@ -310,14 +312,14 @@ public class IndexTask extends AbstractTask implements ChatHandler if (needsDeterminePartitions) { events.put( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, IndexTaskUtils.getMessagesFromSavedParseExceptions(determinePartitionsSavedParseExceptions) ); } if (needsBuildSegments) { events.put( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, IndexTaskUtils.getMessagesFromSavedParseExceptions(buildSegmentsSavedParseExceptions) ); } @@ -336,6 +338,7 @@ public class IndexTask extends AbstractTask implements ChatHandler IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper); Map returnMap = Maps.newHashMap(); Map totalsMap = Maps.newHashMap(); + Map averagesMap = Maps.newHashMap(); boolean needsDeterminePartitions = false; boolean needsBuildSegments = false; @@ -358,24 +361,29 @@ public class IndexTask extends AbstractTask implements ChatHandler } if (needsDeterminePartitions) { - if (determinePartitionsMetricsGetter != null) { - totalsMap.put( - "determinePartitions", - determinePartitionsMetricsGetter.getTotalMetrics() - ); - } + totalsMap.put( + RowIngestionMeters.DETERMINE_PARTITIONS, + determinePartitionsMeters.getTotals() + ); + averagesMap.put( + RowIngestionMeters.DETERMINE_PARTITIONS, + determinePartitionsMeters.getMovingAverages() + ); } if (needsBuildSegments) { - if (buildSegmentsMetricsGetter != null) { - totalsMap.put( - "buildSegments", - buildSegmentsMetricsGetter.getTotalMetrics() - ); - } + totalsMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + buildSegmentsMeters.getTotals() + ); + averagesMap.put( + RowIngestionMeters.BUILD_SEGMENTS, + buildSegmentsMeters.getMovingAverages() + ); } returnMap.put("totals", totalsMap); + returnMap.put("movingAverages", averagesMap); return Response.ok(returnMap).build(); } @@ -486,8 +494,8 @@ public class IndexTask extends AbstractTask implements ChatHandler buildSegmentsSavedParseExceptions); if (determinePartitionsParseExceptionMessages != null || buildSegmentsParseExceptionMessages != null) { - unparseableEventsMap.put("determinePartitions", determinePartitionsParseExceptionMessages); - unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages); + unparseableEventsMap.put(RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsParseExceptionMessages); + unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages); } return unparseableEventsMap; @@ -496,18 +504,16 @@ public class IndexTask extends AbstractTask implements ChatHandler private Map getTaskCompletionRowStats() { Map metrics = Maps.newHashMap(); - if (determinePartitionsMetricsGetter != null) { - metrics.put( - "determinePartitions", - determinePartitionsMetricsGetter.getTotalMetrics() - ); - } - if (buildSegmentsMetricsGetter != null) { - metrics.put( - "buildSegments", - buildSegmentsMetricsGetter.getTotalMetrics() - ); - } + metrics.put( + RowIngestionMeters.DETERMINE_PARTITIONS, + determinePartitionsMeters.getTotals() + ); + + metrics.put( + RowIngestionMeters.BUILD_SEGMENTS, + buildSegmentsMeters.getTotals() + ); + return metrics; } @@ -699,11 +705,6 @@ public class IndexTask extends AbstractTask implements ChatHandler boolean determineNumPartitions ) throws IOException { - determinePartitionsFireDepartmentMetrics = new FireDepartmentMetrics(); - determinePartitionsMetricsGetter = new FireDepartmentMetricsTaskMetricsGetter( - determinePartitionsFireDepartmentMetrics - ); - final Map> hllCollectors = new TreeMap<>( Comparators.intervalsByStartThenEnd() ); @@ -721,7 +722,7 @@ public class IndexTask extends AbstractTask implements ChatHandler // The null inputRow means the caller must skip this row. if (inputRow == null) { - determinePartitionsFireDepartmentMetrics.incrementThrownAway(); + determinePartitionsMeters.incrementThrownAway(); continue; } @@ -739,7 +740,7 @@ public class IndexTask extends AbstractTask implements ChatHandler final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); if (!optInterval.isPresent()) { - determinePartitionsFireDepartmentMetrics.incrementThrownAway(); + determinePartitionsMeters.incrementThrownAway(); continue; } interval = optInterval.get(); @@ -763,7 +764,7 @@ public class IndexTask extends AbstractTask implements ChatHandler hllCollectors.put(interval, Optional.absent()); } } - determinePartitionsFireDepartmentMetrics.incrementProcessed(); + determinePartitionsMeters.incrementProcessed(); } catch (ParseException e) { if (ingestionSchema.getTuningConfig().isLogParseExceptions()) { @@ -774,8 +775,8 @@ public class IndexTask extends AbstractTask implements ChatHandler determinePartitionsSavedParseExceptions.add(e); } - determinePartitionsFireDepartmentMetrics.incrementUnparseable(); - if (determinePartitionsFireDepartmentMetrics.unparseable() > ingestionSchema.getTuningConfig() + determinePartitionsMeters.incrementUnparseable(); + if (determinePartitionsMeters.getUnparseable() > ingestionSchema.getTuningConfig() .getMaxParseExceptions()) { throw new RuntimeException("Max parse exceptions exceeded, terminating task..."); } @@ -839,10 +840,13 @@ public class IndexTask extends AbstractTask implements ChatHandler dataSchema, new RealtimeIOConfig(null, null, null), null ); buildSegmentsFireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); - buildSegmentsMetricsGetter = new FireDepartmentMetricsTaskMetricsGetter(buildSegmentsFireDepartmentMetrics); if (toolbox.getMonitorScheduler() != null) { - final RealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics); + final TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build( + this, + fireDepartmentForMetrics, + buildSegmentsMeters + ); toolbox.getMonitorScheduler().addMonitor(metricsMonitor); } @@ -924,7 +928,7 @@ public class IndexTask extends AbstractTask implements ChatHandler final InputRow inputRow = firehose.nextRow(); if (inputRow == null) { - buildSegmentsFireDepartmentMetrics.incrementThrownAway(); + buildSegmentsMeters.incrementThrownAway(); continue; } @@ -938,7 +942,7 @@ public class IndexTask extends AbstractTask implements ChatHandler final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); if (!optInterval.isPresent()) { - buildSegmentsFireDepartmentMetrics.incrementThrownAway(); + buildSegmentsMeters.incrementThrownAway(); continue; } @@ -974,7 +978,7 @@ public class IndexTask extends AbstractTask implements ChatHandler if (addResult.getParseException() != null) { handleParseException(addResult.getParseException()); } else { - buildSegmentsFireDepartmentMetrics.incrementProcessed(); + buildSegmentsMeters.incrementProcessed(); } } catch (ParseException e) { @@ -1002,9 +1006,9 @@ public class IndexTask extends AbstractTask implements ChatHandler } else { log.info( "Processed[%,d] events, unparseable[%,d], thrownAway[%,d].", - buildSegmentsFireDepartmentMetrics.processed(), - buildSegmentsFireDepartmentMetrics.unparseable(), - buildSegmentsFireDepartmentMetrics.thrownAway() + buildSegmentsMeters.getProcessed(), + buildSegmentsMeters.getUnparseable(), + buildSegmentsMeters.getThrownAway() ); log.info( "Published segments[%s]", Joiner.on(", ").join( @@ -1027,9 +1031,9 @@ public class IndexTask extends AbstractTask implements ChatHandler private void handleParseException(ParseException e) { if (e.isFromPartiallyValidRow()) { - buildSegmentsFireDepartmentMetrics.incrementProcessedWithErrors(); + buildSegmentsMeters.incrementProcessedWithError(); } else { - buildSegmentsFireDepartmentMetrics.incrementUnparseable(); + buildSegmentsMeters.incrementUnparseable(); } if (ingestionSchema.tuningConfig.isLogParseExceptions()) { @@ -1040,8 +1044,8 @@ public class IndexTask extends AbstractTask implements ChatHandler buildSegmentsSavedParseExceptions.add(e); } - if (buildSegmentsFireDepartmentMetrics.unparseable() - + buildSegmentsFireDepartmentMetrics.processedWithErrors() > ingestionSchema.tuningConfig.getMaxParseExceptions()) { + if (buildSegmentsMeters.getUnparseable() + + buildSegmentsMeters.getProcessedWithError() > ingestionSchema.tuningConfig.getMaxParseExceptions()) { log.error("Max parse exceptions exceeded, terminating task..."); throw new RuntimeException("Max parse exceptions exceeded, terminating task...", e); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 870c01d7a93..bc0dd7b0d29 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -142,6 +142,12 @@ public class SupervisorManager return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStatus()); } + public Optional>> getSupervisorStats(String id) + { + Pair supervisor = supervisors.get(id); + return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStats()); + } + public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata) { Preconditions.checkState(started, "SupervisorManager not started"); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java index 7a7d535da41..1518590eee3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -188,6 +188,39 @@ public class SupervisorResource ); } + @GET + @Path("/{id}/stats") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response getAllTaskStats( + @PathParam("id") final String id + ) + { + return asLeaderWithSupervisorManager( + new Function() + { + @Override + public Response apply(SupervisorManager manager) + { + Optional>> stats = manager.getSupervisorStats(id); + if (!stats.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity( + ImmutableMap.of( + "error", + StringUtils.format("[%s] does not exist", id) + ) + ) + .build(); + } + + return Response.ok(stats.get()).build(); + } + } + ); + } + + @POST @Path("/{id}/shutdown") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java index ad8da3bfe12..7ab075c2cee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Stopwatch; import io.druid.guice.ServerModule; +import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; @@ -51,6 +53,7 @@ public class TestUtils private final ObjectMapper jsonMapper; private final IndexMergerV9 indexMergerV9; private final IndexIO indexIO; + private final RowIngestionMetersFactory rowIngestionMetersFactory; public TestUtils() { @@ -74,6 +77,8 @@ public class TestUtils jsonMapper.registerModule(module); } + this.rowIngestionMetersFactory = new DropwizardRowIngestionMetersFactory(); + jsonMapper.setInjectableValues( new InjectableValues.Std() .addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE) @@ -82,6 +87,7 @@ public class TestUtils .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider()) .addValue(AuthConfig.class, new AuthConfig()) .addValue(AuthorizerMapper.class, null) + .addValue(RowIngestionMetersFactory.class, rowIngestionMetersFactory) .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT) ); } @@ -101,6 +107,11 @@ public class TestUtils return indexIO; } + public RowIngestionMetersFactory getRowIngestionMetersFactory() + { + return rowIngestionMetersFactory; + } + public static boolean conditionValid(IndexingServiceCondition condition) { return conditionValid(condition, 1000); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 6da9807536e..0759d116938 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -46,7 +46,6 @@ import io.druid.discovery.DataNodeService; import io.druid.discovery.DruidNodeAnnouncer; import io.druid.discovery.LookupNodeService; import io.druid.indexer.IngestionState; -import io.druid.indexer.TaskMetricsUtils; import io.druid.indexer.TaskState; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.SegmentLoaderFactory; @@ -63,6 +62,8 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec; import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig; +import io.druid.indexing.common.stats.RowIngestionMeters; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.SegmentPublishResult; @@ -269,6 +270,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest private TaskToolboxFactory taskToolboxFactory; private File baseDir; private File reportsFile; + private RowIngestionMetersFactory rowIngestionMetersFactory; @Before public void setUp() throws IOException @@ -360,9 +362,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest Collection publishedSegments = awaitSegments(); // Check metrics. - Assert.assertEquals(2, task.getMetrics().processed()); - Assert.assertEquals(0, task.getMetrics().thrownAway()); - Assert.assertEquals(0, task.getMetrics().unparseable()); + Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); // Do some queries. Assert.assertEquals(2, sumMetric(task, null, "rows")); @@ -422,9 +424,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest Collection publishedSegments = awaitSegments(); // Check metrics. - Assert.assertEquals(2, task.getMetrics().processed()); - Assert.assertEquals(0, task.getMetrics().thrownAway()); - Assert.assertEquals(0, task.getMetrics().unparseable()); + Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); // Do some queries. Assert.assertEquals(2, sumMetric(task, null, "rows")); @@ -487,9 +489,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest Collection publishedSegments = awaitSegments(); // Check metrics. - Assert.assertEquals(2000, task.getMetrics().processed()); - Assert.assertEquals(0, task.getMetrics().thrownAway()); - Assert.assertEquals(0, task.getMetrics().unparseable()); + Assert.assertEquals(2000, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); // Do some queries. Assert.assertEquals(2000, sumMetric(task, null, "rows")); @@ -555,9 +557,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest Collection publishedSegments = awaitSegments(); // Check metrics. - Assert.assertEquals(2, task.getMetrics().processed()); - Assert.assertEquals(1, task.getMetrics().thrownAway()); - Assert.assertEquals(0, task.getMetrics().unparseable()); + Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable()); // Do some queries. Assert.assertEquals(2, sumMetric(task, null, "rows")); @@ -623,7 +625,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedUnparseables = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( "Found unparseable columns in row: [MapBasedInputRow{timestamp=1970-01-01T00:50:00.000Z, event={t=3000000, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]" ) @@ -675,10 +677,10 @@ public class AppenderatorDriverRealtimeIndexTaskTest DataSegment publishedSegment = Iterables.getOnlyElement(publishedSegments); // Check metrics. - Assert.assertEquals(2, task.getMetrics().processed()); - Assert.assertEquals(1, task.getMetrics().processedWithErrors()); - Assert.assertEquals(0, task.getMetrics().thrownAway()); - Assert.assertEquals(2, task.getMetrics().unparseable()); + Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(1, task.getRowIngestionMeters().getProcessedWithError()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(2, task.getRowIngestionMeters().getUnparseable()); // Do some queries. Assert.assertEquals(3, sumMetric(task, null, "rows")); @@ -702,12 +704,12 @@ public class AppenderatorDriverRealtimeIndexTaskTest handOffCallbacks.clear(); Map expectedMetrics = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED, 2, - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 1, - TaskMetricsUtils.ROWS_UNPARSEABLE, 2, - TaskMetricsUtils.ROWS_THROWN_AWAY, 0 + RowIngestionMeters.PROCESSED, 2, + RowIngestionMeters.PROCESSED_WITH_ERROR, 1, + RowIngestionMeters.UNPARSEABLE, 2, + RowIngestionMeters.THROWN_AWAY, 0 ) ); @@ -767,10 +769,10 @@ public class AppenderatorDriverRealtimeIndexTaskTest DataSegment publishedSegment = Iterables.getOnlyElement(publishedSegments); // Check metrics. - Assert.assertEquals(2, task.getMetrics().processed()); - Assert.assertEquals(2, task.getMetrics().processedWithErrors()); - Assert.assertEquals(0, task.getMetrics().thrownAway()); - Assert.assertEquals(2, task.getMetrics().unparseable()); + Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed()); + Assert.assertEquals(2, task.getRowIngestionMeters().getProcessedWithError()); + Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(2, task.getRowIngestionMeters().getUnparseable()); // Do some queries. Assert.assertEquals(4, sumMetric(task, null, "rows")); @@ -794,12 +796,12 @@ public class AppenderatorDriverRealtimeIndexTaskTest handOffCallbacks.clear(); Map expectedMetrics = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED, 2, - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 2, - TaskMetricsUtils.ROWS_UNPARSEABLE, 2, - TaskMetricsUtils.ROWS_THROWN_AWAY, 0 + RowIngestionMeters.PROCESSED, 2, + RowIngestionMeters.PROCESSED_WITH_ERROR, 2, + RowIngestionMeters.UNPARSEABLE, 2, + RowIngestionMeters.THROWN_AWAY, 0 ) ); @@ -811,7 +813,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Map expectedUnparseables = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( "Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]", @@ -871,17 +873,17 @@ public class AppenderatorDriverRealtimeIndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedMetrics = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED, 1, - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 2, - TaskMetricsUtils.ROWS_UNPARSEABLE, 2, - TaskMetricsUtils.ROWS_THROWN_AWAY, 0 + RowIngestionMeters.PROCESSED, 1, + RowIngestionMeters.PROCESSED_WITH_ERROR, 2, + RowIngestionMeters.UNPARSEABLE, 2, + RowIngestionMeters.THROWN_AWAY, 0 ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Map expectedUnparseables = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( "Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}", "Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]", @@ -1123,17 +1125,17 @@ public class AppenderatorDriverRealtimeIndexTaskTest // Wait for the task to finish. TaskStatus status = statusFuture.get(); - IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); - Map expectedMetrics = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED, 0, - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0, - TaskMetricsUtils.ROWS_UNPARSEABLE, 0, - TaskMetricsUtils.ROWS_THROWN_AWAY, 0 + RowIngestionMeters.PROCESSED_WITH_ERROR, 0, + RowIngestionMeters.PROCESSED, 0, + RowIngestionMeters.UNPARSEABLE, 0, + RowIngestionMeters.THROWN_AWAY, 0 ) ); + + IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Assert.assertTrue(status.getErrorMsg().contains("java.lang.IllegalArgumentException\n\tat java.nio.Buffer.position")); } @@ -1261,7 +1263,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest new RealtimeAppenderatorIngestionSpec(dataSchema, realtimeIOConfig, tuningConfig), null, null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + rowIngestionMetersFactory ) { @Override @@ -1423,6 +1426,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest } }; final TestUtils testUtils = new TestUtils(); + rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() { @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index ebac68e8b88..e83999f37a9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -44,9 +44,11 @@ import io.druid.guice.GuiceAnnotationIntrospector; import io.druid.guice.GuiceInjectableValues; import io.druid.guice.GuiceInjectors; import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.CompactionTask.SegmentProvider; import io.druid.indexing.common.task.IndexTask.IndexIOConfig; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; @@ -80,6 +82,8 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import io.druid.segment.transform.TransformingInputRowParser; import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import io.druid.server.security.AuthTestUtils; @@ -130,6 +134,7 @@ public class CompactionTaskTest private static Map DIMENSIONS; private static Map AGGREGATORS; private static List SEGMENTS; + private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper()); private static Map segmentMap; @@ -211,6 +216,8 @@ public class CompactionTaskTest public void configure(Binder binder) { binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER); + binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider()); + binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory); } } ) @@ -294,7 +301,9 @@ public class CompactionTaskTest createTuningConfig(), ImmutableMap.of("testKey", "testContext"), objectMapper, - AuthTestUtils.TEST_AUTHORIZER_MAPPER + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + rowIngestionMetersFactory ); final byte[] bytes = objectMapper.writeValueAsBytes(task); final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); @@ -321,7 +330,9 @@ public class CompactionTaskTest createTuningConfig(), ImmutableMap.of("testKey", "testContext"), objectMapper, - AuthTestUtils.TEST_AUTHORIZER_MAPPER + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + rowIngestionMetersFactory ); final byte[] bytes = objectMapper.writeValueAsBytes(task); final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index b6d72494c59..c48766d8760 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -36,7 +36,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; -import io.druid.indexer.TaskMetricsUtils; import io.druid.indexer.TaskState; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.TaskLock; @@ -53,6 +52,8 @@ import io.druid.indexing.common.actions.SegmentAllocateAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.stats.RowIngestionMeters; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.indexing.overlord.SegmentPublishResult; @@ -137,6 +138,7 @@ public class IndexTaskTest private IndexIO indexIO; private volatile int segmentAllocatePartitionCounter; private File reportsFile; + private RowIngestionMetersFactory rowIngestionMetersFactory; public IndexTaskTest() { @@ -144,6 +146,7 @@ public class IndexTaskTest jsonMapper = testUtils.getTestObjectMapper(); indexMergerV9 = testUtils.getTestIndexMergerV9(); indexIO = testUtils.getTestIndexIO(); + rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); } @Before @@ -183,7 +186,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -228,7 +232,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); Assert.assertEquals(indexTask.getId(), indexTask.getGroupId()); @@ -279,7 +284,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); Assert.assertEquals(indexTask.getId(), indexTask.getGroupId()); @@ -322,7 +328,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); List segments = runTask(indexTask).rhs; @@ -358,7 +365,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -390,7 +398,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -428,7 +437,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); Assert.assertEquals("index_append_test", indexTask.getGroupId()); @@ -477,7 +487,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -539,7 +550,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -590,7 +602,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -636,7 +649,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -680,7 +694,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -723,7 +738,8 @@ public class IndexTaskTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -800,7 +816,8 @@ public class IndexTaskTest parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -852,7 +869,8 @@ public class IndexTaskTest parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); TaskStatus status = runTask(indexTask).lhs; @@ -860,9 +878,9 @@ public class IndexTaskTest checkTaskStatusErrorMsgForParseExceptionsExceeded(status); Map expectedUnparseables = ImmutableMap.of( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, new ArrayList<>(), - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList("Unparseable timestamp found! Event: {time=unparseable, d=a, val=1}") ); IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); @@ -940,7 +958,8 @@ public class IndexTaskTest parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); TaskStatus status = runTask(indexTask).lhs; @@ -950,32 +969,33 @@ public class IndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedMetrics = ImmutableMap.of( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0, - TaskMetricsUtils.ROWS_PROCESSED, 4, - TaskMetricsUtils.ROWS_UNPARSEABLE, 4, - TaskMetricsUtils.ROWS_THROWN_AWAY, 1 + RowIngestionMeters.PROCESSED_WITH_ERROR, 0, + RowIngestionMeters.PROCESSED, 4, + RowIngestionMeters.UNPARSEABLE, 4, + RowIngestionMeters.THROWN_AWAY, 1 ), - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 3, - TaskMetricsUtils.ROWS_PROCESSED, 1, - TaskMetricsUtils.ROWS_UNPARSEABLE, 4, - TaskMetricsUtils.ROWS_THROWN_AWAY, 1 + + RowIngestionMeters.PROCESSED_WITH_ERROR, 3, + RowIngestionMeters.PROCESSED, 1, + RowIngestionMeters.UNPARSEABLE, 4, + RowIngestionMeters.THROWN_AWAY, 1 ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Map expectedUnparseables = ImmutableMap.of( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, Arrays.asList( "Unable to parse row [this is not JSON]", "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", "Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]", "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}" ), - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( "Unable to parse row [this is not JSON]", "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", @@ -1061,7 +1081,8 @@ public class IndexTaskTest parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); TaskStatus status = runTask(indexTask).lhs; @@ -1071,21 +1092,28 @@ public class IndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedMetrics = ImmutableMap.of( - "buildSegments", + RowIngestionMeters.DETERMINE_PARTITIONS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0, - TaskMetricsUtils.ROWS_PROCESSED, 1, - TaskMetricsUtils.ROWS_UNPARSEABLE, 3, - TaskMetricsUtils.ROWS_THROWN_AWAY, 2 + RowIngestionMeters.PROCESSED_WITH_ERROR, 0, + RowIngestionMeters.PROCESSED, 0, + RowIngestionMeters.UNPARSEABLE, 0, + RowIngestionMeters.THROWN_AWAY, 0 + ), + RowIngestionMeters.BUILD_SEGMENTS, + ImmutableMap.of( + RowIngestionMeters.PROCESSED_WITH_ERROR, 0, + RowIngestionMeters.PROCESSED, 1, + RowIngestionMeters.UNPARSEABLE, 3, + RowIngestionMeters.THROWN_AWAY, 2 ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Map expectedUnparseables = ImmutableMap.of( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, new ArrayList<>(), - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList( "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", "Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}", @@ -1167,7 +1195,8 @@ public class IndexTaskTest parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); TaskStatus status = runTask(indexTask).lhs; @@ -1177,25 +1206,32 @@ public class IndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedMetrics = ImmutableMap.of( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0, - TaskMetricsUtils.ROWS_PROCESSED, 1, - TaskMetricsUtils.ROWS_UNPARSEABLE, 3, - TaskMetricsUtils.ROWS_THROWN_AWAY, 2 + RowIngestionMeters.PROCESSED_WITH_ERROR, 0, + RowIngestionMeters.PROCESSED, 1, + RowIngestionMeters.UNPARSEABLE, 3, + RowIngestionMeters.THROWN_AWAY, 2 + ), + RowIngestionMeters.BUILD_SEGMENTS, + ImmutableMap.of( + RowIngestionMeters.PROCESSED_WITH_ERROR, 0, + RowIngestionMeters.PROCESSED, 0, + RowIngestionMeters.UNPARSEABLE, 0, + RowIngestionMeters.THROWN_AWAY, 0 ) ); Assert.assertEquals(expectedMetrics, reportData.getRowStats()); Map expectedUnparseables = ImmutableMap.of( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, Arrays.asList( "Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}", "Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}", "Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}" ), - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, new ArrayList<>() ); @@ -1258,7 +1294,8 @@ public class IndexTaskTest parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final List segments = runTask(indexTask).rhs; @@ -1327,7 +1364,8 @@ public class IndexTaskTest parseExceptionIgnoreSpec, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); TaskStatus status = runTask(indexTask).lhs; @@ -1338,9 +1376,9 @@ public class IndexTaskTest IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData(); Map expectedUnparseables = ImmutableMap.of( - "determinePartitions", + RowIngestionMeters.DETERMINE_PARTITIONS, new ArrayList<>(), - "buildSegments", + RowIngestionMeters.BUILD_SEGMENTS, Arrays.asList("Unparseable timestamp found! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}") ); Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 15af1eb8e83..da62059f9b4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -30,6 +30,7 @@ import io.druid.guice.FirehoseModule; import io.druid.indexer.HadoopIOConfig; import io.druid.indexer.HadoopIngestionSpec; import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.IndexTask.IndexIOConfig; import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; @@ -69,6 +70,7 @@ import java.util.List; public class TaskSerdeTest { private final ObjectMapper jsonMapper; + private final RowIngestionMetersFactory rowIngestionMetersFactory; private final IndexSpec indexSpec = new IndexSpec(); @Rule @@ -78,6 +80,7 @@ public class TaskSerdeTest { TestUtils testUtils = new TestUtils(); jsonMapper = testUtils.getTestObjectMapper(); + rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); @@ -214,7 +217,8 @@ public class TaskSerdeTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); final String json = jsonMapper.writeValueAsString(task); @@ -298,7 +302,8 @@ public class TaskSerdeTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + rowIngestionMetersFactory ); for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 8b0aa420998..e7cd34dfdee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -57,6 +57,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; import io.druid.indexing.common.task.IndexTask.IndexIOConfig; @@ -153,12 +154,14 @@ public class TaskLifecycleTest private static final IndexMergerV9 INDEX_MERGER_V9; private static final IndexIO INDEX_IO; private static final TestUtils TEST_UTILS; + private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY; static { TEST_UTILS = new TestUtils(); MAPPER = TEST_UTILS.getTestObjectMapper(); INDEX_MERGER_V9 = TEST_UTILS.getTestIndexMergerV9(); INDEX_IO = TEST_UTILS.getTestIndexIO(); + ROW_INGESTION_METERS_FACTORY = TEST_UTILS.getRowIngestionMetersFactory(); } private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage"; @@ -691,7 +694,8 @@ public class TaskLifecycleTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + ROW_INGESTION_METERS_FACTORY ); final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); @@ -770,7 +774,8 @@ public class TaskLifecycleTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + ROW_INGESTION_METERS_FACTORY ); final TaskStatus status = runTask(indexTask); @@ -1156,7 +1161,8 @@ public class TaskLifecycleTest ), null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + ROW_INGESTION_METERS_FACTORY ); final long startTime = System.currentTimeMillis(); diff --git a/pom.xml b/pom.xml index 4dc14357f05..9bfc5780d89 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ 2.12.0 1.10.0 1.15.0 + 4.0.0 8.1.0 16.0.1 4.1.0 @@ -677,6 +678,11 @@ jna 4.5.1 + + io.dropwizard.metrics + metrics-core + ${dropwizard.metrics.version} + diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 681421b4800..d5efd8e7444 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -19,9 +19,11 @@ package io.druid.indexing.overlord.supervisor; +import com.google.common.collect.ImmutableMap; import io.druid.indexing.overlord.DataSourceMetadata; import javax.annotation.Nullable; +import java.util.Map; public interface Supervisor { @@ -37,6 +39,11 @@ public interface Supervisor SupervisorReport getStatus(); + default Map> getStats() + { + return ImmutableMap.of(); + } + void reset(DataSourceMetadata dataSourceMetadata); /** diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetricsTaskMetricsGetter.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetricsTaskMetricsGetter.java deleted file mode 100644 index 9c7ee60fde1..00000000000 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentMetricsTaskMetricsGetter.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.realtime; - -import com.google.common.collect.ImmutableMap; -import io.druid.indexer.TaskMetricsGetter; -import io.druid.indexer.TaskMetricsUtils; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public class FireDepartmentMetricsTaskMetricsGetter implements TaskMetricsGetter -{ - public static final List KEYS = Arrays.asList( - TaskMetricsUtils.ROWS_PROCESSED, - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, - TaskMetricsUtils.ROWS_THROWN_AWAY, - TaskMetricsUtils.ROWS_UNPARSEABLE - ); - - private final FireDepartmentMetrics fireDepartmentMetrics; - - public FireDepartmentMetricsTaskMetricsGetter( - FireDepartmentMetrics fireDepartmentMetrics - ) - { - this.fireDepartmentMetrics = fireDepartmentMetrics; - } - - @Override - public List getKeys() - { - return KEYS; - } - - @Override - public Map getTotalMetrics() - { - return ImmutableMap.of( - TaskMetricsUtils.ROWS_PROCESSED, fireDepartmentMetrics.processed(), - TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, fireDepartmentMetrics.processedWithErrors(), - TaskMetricsUtils.ROWS_THROWN_AWAY, fireDepartmentMetrics.thrownAway(), - TaskMetricsUtils.ROWS_UNPARSEABLE, fireDepartmentMetrics.unparseable() - ); - } -} diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java index 38f82326792..6cea578983d 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeMetricsMonitor.java @@ -33,6 +33,10 @@ import java.util.List; import java.util.Map; /** + * RealtimeMetricsMonitor is only used by RealtimeIndexTask, this monitor only supports FireDepartmentMetrics. + * New ingestion task types should support RowIngestionMeters and use TaskRealtimeMetricsMonitor instead. + * Please see the comment on RowIngestionMeters for more information regarding the relationship between + * RowIngestionMeters and FireDepartmentMetrics. */ public class RealtimeMetricsMonitor extends AbstractMonitor { diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index ed4d44a2762..934e63d2eeb 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -24,6 +24,7 @@ import com.google.inject.Binder; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.multibindings.MapBinder; import com.google.inject.name.Names; import com.google.inject.util.Providers; import io.airlift.airline.Command; @@ -37,8 +38,11 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.ManageLifecycle; +import io.druid.guice.PolyBind; import io.druid.guice.annotations.Self; import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.overlord.ForkingTaskRunner; import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.worker.Worker; @@ -92,6 +96,19 @@ public class CliMiddleManager extends ServerRunnable binder.bind(ForkingTaskRunner.class).in(LazySingleton.class); binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); + PolyBind.createChoice( + binder, + "druid.indexer.task.rowIngestionMeters.type", + Key.get(RowIngestionMetersFactory.class), + Key.get(DropwizardRowIngestionMetersFactory.class) + ); + final MapBinder rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(RowIngestionMetersFactory.class) + ); + rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard") + .to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class); binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class); diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 1faba1f4f32..3bad59440ae 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -54,6 +54,8 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; import io.druid.indexing.overlord.ForkingTaskRunnerFactory; @@ -184,6 +186,19 @@ public class CliOverlord extends ServerRunnable binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null)); + PolyBind.createChoice( + binder, + "druid.indexer.task.rowIngestionMeters.type", + Key.get(RowIngestionMetersFactory.class), + Key.get(DropwizardRowIngestionMetersFactory.class) + ); + final MapBinder rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(RowIngestionMetersFactory.class) + ); + rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard") + .to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + configureTaskStorage(binder); configureAutoscale(binder); configureRunners(binder); diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index aa6f0b13583..136645d2e7a 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -62,6 +62,8 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -159,6 +161,19 @@ public class CliPeon extends GuiceRunnable binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1); + PolyBind.createChoice( + binder, + "druid.indexer.task.rowIngestionMeters.type", + Key.get(RowIngestionMetersFactory.class), + Key.get(DropwizardRowIngestionMetersFactory.class) + ); + final MapBinder rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(RowIngestionMetersFactory.class) + ); + rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard") + .to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + PolyBind.createChoice( binder, "druid.indexer.task.chathandler.type", diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 827b94956b0..01b1ba6d6b3 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -27,6 +27,8 @@ import com.google.inject.multibindings.MapBinder; import io.druid.cli.QueryJettyServerInitializer; import io.druid.client.cache.CacheConfig; import io.druid.client.coordinator.CoordinatorClient; +import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; +import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.metadata.MetadataSegmentPublisher; import io.druid.query.QuerySegmentWalker; import io.druid.segment.realtime.FireDepartment; @@ -66,6 +68,19 @@ public class RealtimeModule implements Module publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class).in(LazySingleton.class); publisherBinder.addBinding("metadata").to(MetadataSegmentPublisher.class).in(LazySingleton.class); + PolyBind.createChoice( + binder, + "druid.realtime.rowIngestionMeters.type", + Key.get(RowIngestionMetersFactory.class), + Key.get(DropwizardRowIngestionMetersFactory.class) + ); + final MapBinder rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder( + binder, Key.get(RowIngestionMetersFactory.class) + ); + rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard") + .to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class); + PolyBind.createChoice( binder, "druid.realtime.chathandler.type",