Moving averages for ingestion row stats (#5748)

* Moving averages for ingestion row stats

* PR comments

* Make RowIngestionMeters extensible

* test and checkstyle fixes

* More PR comments

* Fix metrics

* Add some comments

* PR comments

* Comments
This commit is contained in:
Jonathan Wei 2018-06-05 09:08:57 -07:00 committed by GitHub
parent 78fd27cdb2
commit 684b5d18c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1268 additions and 421 deletions

View File

@ -152,6 +152,11 @@
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<!-- com.lmax.disruptor is optional in log4j-core, so we explicitly include it here -->
<dependency>
<groupId>com.lmax</groupId>

View File

@ -50,7 +50,6 @@ import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskMetricsGetter;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@ -63,6 +62,8 @@ import io.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import io.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.IndexTaskUtils;
import io.druid.indexing.common.task.RealtimeIndexTask;
@ -86,7 +87,6 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
@ -244,6 +244,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private final CountDownLatch waitForPublishes = new CountDownLatch(1);
private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
private final String topic;
private final RowIngestionMeters rowIngestionMeters;
private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
private ListeningExecutorService publishExecService;
@ -251,7 +252,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private CircularBuffer<Throwable> savedParseExceptions;
private IngestionState ingestionState;
private TaskMetricsGetter metricsGetter;
private String errorMsg;
@JsonCreator
@ -263,7 +263,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
@JsonProperty("ioConfig") KafkaIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
)
{
super(
@ -284,6 +285,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
this.topic = ioConfig.getStartPartitions().getTopic();
this.sequences = new CopyOnWriteArrayList<>();
this.ingestionState = IngestionState.NOT_STARTED;
this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
if (context != null && context.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null
&& ((boolean) context.get(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) {
@ -511,8 +513,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
null
);
fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(fireDepartmentMetrics);
toolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics));
toolbox.getMonitorScheduler()
.addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics, rowIngestionMeters));
LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ?
toolbox.getLookupNodeService() :
@ -758,10 +760,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
if (addResult.getParseException() != null) {
handleParseException(addResult.getParseException(), record);
} else {
fireDepartmentMetrics.incrementProcessed();
rowIngestionMeters.incrementProcessed();
}
} else {
fireDepartmentMetrics.incrementThrownAway();
rowIngestionMeters.incrementThrownAway();
}
}
if (isPersistRequired) {
@ -950,8 +952,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
null
);
fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(fireDepartmentMetrics);
toolbox.getMonitorScheduler().addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics));
toolbox.getMonitorScheduler()
.addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics, rowIngestionMeters));
LookupNodeService lookupNodeService = getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER) == null ?
toolbox.getLookupNodeService() :
@ -1148,10 +1150,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
if (addResult.getParseException() != null) {
handleParseException(addResult.getParseException(), record);
} else {
fireDepartmentMetrics.incrementProcessed();
rowIngestionMeters.incrementProcessed();
}
} else {
fireDepartmentMetrics.incrementThrownAway();
rowIngestionMeters.incrementThrownAway();
}
}
@ -1296,9 +1298,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private void handleParseException(ParseException pe, ConsumerRecord<byte[], byte[]> record)
{
if (pe.isFromPartiallyValidRow()) {
fireDepartmentMetrics.incrementProcessedWithErrors();
rowIngestionMeters.incrementProcessedWithError();
} else {
fireDepartmentMetrics.incrementUnparseable();
rowIngestionMeters.incrementUnparseable();
}
if (tuningConfig.isLogParseExceptions()) {
@ -1314,7 +1316,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
savedParseExceptions.add(pe);
}
if (fireDepartmentMetrics.unparseable() + fireDepartmentMetrics.processedWithErrors()
if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
> tuningConfig.getMaxParseExceptions()) {
log.error("Max parse exceptions exceeded, terminating task...");
throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
@ -1341,7 +1343,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
Map<String, Object> unparseableEventsMap = Maps.newHashMap();
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
if (buildSegmentsParseExceptionMessages != null) {
unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages);
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
}
return unparseableEventsMap;
}
@ -1349,12 +1351,10 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private Map<String, Object> getTaskCompletionRowStats()
{
Map<String, Object> metrics = Maps.newHashMap();
if (metricsGetter != null) {
metrics.put(
"buildSegments",
metricsGetter.getTotalMetrics()
);
}
metrics.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
);
return metrics;
}
@ -1574,14 +1574,18 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
authorizationCheck(req, Action.READ);
Map<String, Object> returnMap = Maps.newHashMap();
Map<String, Object> totalsMap = Maps.newHashMap();
Map<String, Object> averagesMap = Maps.newHashMap();
if (metricsGetter != null) {
totalsMap.put(
"buildSegments",
metricsGetter.getTotalMetrics()
);
}
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
);
averagesMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getMovingAverages()
);
returnMap.put("movingAverages", averagesMap);
returnMap.put("totals", totalsMap);
return Response.ok(returnMap).build();
}
@ -1885,6 +1889,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return fireDepartmentMetrics;
}
@VisibleForTesting
RowIngestionMeters getRowIngestionMeters()
{
return rowIngestionMeters;
}
private boolean isPaused()
{
return status == Status.PAUSED;

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -39,6 +38,7 @@ import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
@ -57,6 +57,7 @@ import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
@ -215,8 +216,10 @@ public class KafkaIndexTaskClient
return ImmutableMap.of();
}
catch (IOException | InterruptedException e) {
log.error("Exception [%s] while pausing Task [%s]", e.getMessage(), id);
throw Throwables.propagate(e);
throw new RuntimeException(
StringUtils.format("Exception [%s] while pausing Task [%s]", e.getMessage(), id),
e
);
}
}
@ -232,7 +235,7 @@ public class KafkaIndexTaskClient
return KafkaIndexTask.Status.NOT_STARTED;
}
catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@ -250,10 +253,48 @@ public class KafkaIndexTaskClient
return null;
}
catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
public Map<String, Object> getMovingAverages(final String id)
{
log.debug("GetMovingAverages task[%s]", id);
try {
final FullResponseHolder response = submitRequest(
id,
HttpMethod.GET,
"rowStats",
null,
true
);
return response.getContent() == null || response.getContent().isEmpty()
? Collections.emptyMap()
: jsonMapper.readValue(response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
}
catch (NoTaskLocationException e) {
return Collections.emptyMap();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(final String id)
{
return executorService.submit(
new Callable<Map<String, Object>>()
{
@Override
public Map<String, Object> call()
{
return getMovingAverages(id);
}
}
);
}
public Map<Integer, Long> getCurrentOffsets(final String id, final boolean retry)
{
log.debug("GetCurrentOffsets task[%s] retry[%s]", id, retry);
@ -268,7 +309,7 @@ public class KafkaIndexTaskClient
return ImmutableMap.of();
}
catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@ -285,7 +326,7 @@ public class KafkaIndexTaskClient
return EMPTY_TREE_MAP;
}
catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@ -313,7 +354,7 @@ public class KafkaIndexTaskClient
return ImmutableMap.of();
}
catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@ -341,7 +382,7 @@ public class KafkaIndexTaskClient
return false;
}
catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
@ -552,11 +593,17 @@ public class KafkaIndexTaskClient
log.debug("HTTP %s: %s", method.getName(), serviceUri.toString());
response = httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
}
catch (IOException | ChannelException ioce) {
throw ioce;
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class);
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
int responseCode = response.getStatus().getCode();
@ -605,10 +652,10 @@ public class KafkaIndexTaskClient
// if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was
// for informational purposes only) so don't log a scary stack trace
log.info("submitRequest failed for [%s], with message [%s]", urlForLog, e.getMessage());
Throwables.propagate(e);
throw new RuntimeException(e);
} else if (delay == null) {
log.warn(e, "Retries exhausted for [%s], last exception:", urlForLog);
Throwables.propagate(e);
throw new RuntimeException(e);
} else {
try {
final long sleepTime = delay.getMillis();
@ -622,7 +669,9 @@ public class KafkaIndexTaskClient
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
Throwables.propagate(e2);
Thread.currentThread().interrupt();
e.addSuppressed(e2);
throw new RuntimeException(e);
}
}
}

View File

@ -45,6 +45,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.kafka.KafkaDataSourceMetadata;
@ -228,6 +229,7 @@ public class KafkaSupervisor implements Supervisor
private final String supervisorId;
private final TaskInfoProvider taskInfoProvider;
private final long futureTimeoutInSeconds; // how long to wait for async operations to complete
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final ExecutorService exec;
private final ScheduledExecutorService scheduledExec;
@ -254,7 +256,8 @@ public class KafkaSupervisor implements Supervisor
final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
final KafkaIndexTaskClientFactory taskClientFactory,
final ObjectMapper mapper,
final KafkaSupervisorSpec spec
final KafkaSupervisorSpec spec,
final RowIngestionMetersFactory rowIngestionMetersFactory
)
{
this.taskStorage = taskStorage;
@ -264,6 +267,7 @@ public class KafkaSupervisor implements Supervisor
this.spec = spec;
this.emitter = spec.getEmitter();
this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
@ -484,6 +488,22 @@ public class KafkaSupervisor implements Supervisor
return generateReport(true);
}
@Override
public Map<String, Map<String, Object>> getStats()
{
try {
return getCurrentTotalStats();
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error(ie, "getStats() interrupted.");
throw new RuntimeException(ie);
}
catch (ExecutionException | TimeoutException eete) {
throw new RuntimeException(eete);
}
}
@Override
public void reset(DataSourceMetadata dataSourceMetadata)
{
@ -1787,7 +1807,8 @@ public class KafkaSupervisor implements Supervisor
kafkaIOConfig,
context,
null,
null
null,
rowIngestionMetersFactory
);
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
@ -2198,4 +2219,106 @@ public class KafkaSupervisor implements Supervisor
}
};
}
/**
* Collect row ingestion stats from all tasks managed by this supervisor.
*
* @return A map of groupId->taskId->task row stats
*
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
private Map<String, Map<String, Object>> getCurrentTotalStats() throws InterruptedException, ExecutionException, TimeoutException
{
Map<String, Map<String, Object>> allStats = Maps.newHashMap();
final List<ListenableFuture<StatsFromTaskResult>> futures = new ArrayList<>();
final List<Pair<Integer, String>> groupAndTaskIds = new ArrayList<>();
for (int groupId : taskGroups.keySet()) {
TaskGroup group = taskGroups.get(groupId);
for (String taskId : group.taskIds()) {
futures.add(
Futures.transform(
taskClient.getMovingAveragesAsync(taskId),
(Function<Map<String, Object>, StatsFromTaskResult>) (currentStats) -> {
return new StatsFromTaskResult(
groupId,
taskId,
currentStats
);
}
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
}
}
for (int groupId : pendingCompletionTaskGroups.keySet()) {
TaskGroup group = taskGroups.get(groupId);
for (String taskId : group.taskIds()) {
futures.add(
Futures.transform(
taskClient.getMovingAveragesAsync(taskId),
(Function<Map<String, Object>, StatsFromTaskResult>) (currentStats) -> {
return new StatsFromTaskResult(
groupId,
taskId,
currentStats
);
}
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
}
}
List<StatsFromTaskResult> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) {
StatsFromTaskResult result = results.get(i);
if (result != null) {
Map<String, Object> groupMap = allStats.computeIfAbsent(result.getGroupId(), k -> Maps.newHashMap());
groupMap.put(result.getTaskId(), result.getStats());
} else {
Pair<Integer, String> groupAndTaskId = groupAndTaskIds.get(i);
log.error("Failed to get stats for group[%d]-task[%s]", groupAndTaskId.lhs, groupAndTaskId.rhs);
}
}
return allStats;
}
private static class StatsFromTaskResult
{
private final String groupId;
private final String taskId;
private final Map<String, Object> stats;
public StatsFromTaskResult(
int groupId,
String taskId,
Map<String, Object> stats
)
{
this.groupId = String.valueOf(groupId);
this.taskId = taskId;
this.stats = stats;
}
public String getGroupId()
{
return groupId;
}
public String getTaskId()
{
return taskId;
}
public Map<String, Object> getStats()
{
return stats;
}
}
}

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.guice.annotations.Json;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
@ -53,6 +54,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
private final ObjectMapper mapper;
private final ServiceEmitter emitter;
private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
@JsonCreator
public KafkaSupervisorSpec(
@ -66,7 +68,8 @@ public class KafkaSupervisorSpec implements SupervisorSpec
@JacksonInject KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory,
@JacksonInject @Json ObjectMapper mapper,
@JacksonInject ServiceEmitter emitter,
@JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig
@JacksonInject DruidMonitorSchedulerConfig monitorSchedulerConfig,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
)
{
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
@ -106,6 +109,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
this.mapper = mapper;
this.emitter = emitter;
this.monitorSchedulerConfig = monitorSchedulerConfig;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
}
@JsonProperty
@ -157,7 +161,8 @@ public class KafkaSupervisorSpec implements SupervisorSpec
indexerMetadataStorageCoordinator,
kafkaIndexTaskClientFactory,
mapper,
this
this,
rowIngestionMetersFactory
);
}

View File

@ -50,7 +50,9 @@ import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -67,6 +69,9 @@ import static org.easymock.EasyMock.reset;
@RunWith(Parameterized.class)
public class KafkaIndexTaskClientTest extends EasyMockSupport
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
private static final String TEST_ID = "test-id";
private static final List<String> TEST_IDS = Lists.newArrayList("test-id1", "test-id2", "test-id3", "test-id4");
@ -149,9 +154,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
verifyAll();
}
@Test(expected = KafkaIndexTaskClient.TaskNotRunnableException.class)
@Test
public void testTaskNotRunnableException()
{
expectedException.expect(KafkaIndexTaskClient.TaskNotRunnableException.class);
expectedException.expectMessage("Aborting request because task [test-id] is not runnable");
reset(taskInfoProvider);
expect(taskInfoProvider.getTaskLocation(TEST_ID)).andReturn(new TaskLocation(TEST_HOST, TEST_PORT, TEST_TLS_PORT))
.anyTimes();
@ -162,9 +170,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
verifyAll();
}
@Test(expected = RuntimeException.class)
@Test
public void testInternalServerError()
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("io.druid.java.util.common.IOE: Received status [500]");
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2);
expect(
httpClient.go(
@ -181,9 +192,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
verifyAll();
}
@Test(expected = IAE.class)
@Test
public void testBadRequest()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Received 400 Bad Request with body:");
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2);
expect(responseHolder.getContent()).andReturn("");
expect(
@ -293,9 +307,12 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Assert.assertEquals(10, (long) results.get(1));
}
@Test(expected = RuntimeException.class)
@Test
public void testGetCurrentOffsetsWithExhaustedRetries()
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("io.druid.java.util.common.IOE: Received status [404]");
client = new TestableKafkaIndexTaskClient(httpClient, objectMapper, taskInfoProvider, 2);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).anyTimes();

View File

@ -39,10 +39,11 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.indexer.TaskMetricsUtils;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskReport;
import io.druid.indexing.common.TaskReportFileWriter;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.IndexTaskTest;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
@ -210,6 +211,7 @@ public class KafkaIndexTaskTest
private final boolean isIncrementalHandoffSupported;
private final Set<Integer> checkpointRequestsHash = Sets.newHashSet();
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
// This should be removed in versions greater that 0.12.x
// isIncrementalHandoffSupported should always be set to true in those later versions
@ -393,9 +395,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -447,9 +449,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -525,9 +527,9 @@ public class KafkaIndexTaskTest
));
// Check metrics
Assert.assertEquals(8, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(3, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(8, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
@ -615,9 +617,9 @@ public class KafkaIndexTaskTest
));
// Check metrics
Assert.assertEquals(2, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
@ -669,9 +671,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -723,9 +725,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
@ -787,9 +789,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(1, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(4, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(1, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(4, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
@ -835,9 +837,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(0, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(0, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
@ -876,9 +878,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -931,9 +933,9 @@ public class KafkaIndexTaskTest
);
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -986,9 +988,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.FAILED, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(1, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
@ -1033,10 +1035,10 @@ public class KafkaIndexTaskTest
Assert.assertEquals(null, status.getErrorMsg());
// Check metrics
Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(3, task.getFireDepartmentMetrics().processedWithErrors());
Assert.assertEquals(3, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(4, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -1052,18 +1054,18 @@ public class KafkaIndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 4,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 3,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3,
TaskMetricsUtils.ROWS_THROWN_AWAY, 1
RowIngestionMeters.PROCESSED, 4,
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
RowIngestionMeters.UNPARSEABLE, 3,
RowIngestionMeters.THROWN_AWAY, 1
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Map<String, Object> unparseableEvents = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=20.0, met1=notanumber}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [Unable to parse value[notanumber] for field[met1],]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float,]",
@ -1115,10 +1117,10 @@ public class KafkaIndexTaskTest
IndexTaskTest.checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().processedWithErrors());
Assert.assertEquals(3, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(3, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
@ -1127,18 +1129,18 @@ public class KafkaIndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 3,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3,
TaskMetricsUtils.ROWS_THROWN_AWAY, 0
RowIngestionMeters.PROCESSED, 3,
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.UNPARSEABLE, 3,
RowIngestionMeters.THROWN_AWAY, 0
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Map<String, Object> unparseableEvents = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unable to parse row [unparseable2]",
"Unable to parse row [unparseable]"
@ -1195,12 +1197,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1264,12 +1266,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.FAILED, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task2.getRowIngestionMeters().getThrownAway());
// Check published segments & metadata, should all be from the first task
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1339,12 +1341,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(3, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task2.getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
@ -1391,9 +1393,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(5, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(5, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -1471,12 +1473,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task1.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1562,12 +1564,12 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(2, task1.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(2, task1.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
@ -1649,9 +1651,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -1734,9 +1736,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(2, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(4, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(2, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
@ -1872,9 +1874,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
@ -1992,7 +1994,8 @@ public class KafkaIndexTaskTest
ioConfig,
context,
null,
null
null,
rowIngestionMetersFactory
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
@ -2036,7 +2039,8 @@ public class KafkaIndexTaskTest
ioConfig,
context,
null,
null
null,
rowIngestionMetersFactory
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
@ -2093,6 +2097,7 @@ public class KafkaIndexTaskTest
{
directory = tempFolder.newFolder();
final TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
for (Module module : new KafkaIndexTaskModule().getJacksonModules()) {
objectMapper.registerModule(module);

View File

@ -34,6 +34,8 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.kafka.KafkaDataSourceMetadata;
@ -138,6 +140,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private KafkaIndexTaskClient taskClient;
private TaskQueue taskQueue;
private String topic;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private static String getTopic()
{
@ -209,6 +212,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
topic = getTopic();
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
}
@After
@ -1982,8 +1986,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClientFactory,
objectMapper,
new NoopServiceEmitter(),
new DruidMonitorSchedulerConfig()
)
new DruidMonitorSchedulerConfig(),
rowIngestionMetersFactory
),
rowIngestionMetersFactory
);
}
@ -2050,7 +2056,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
),
ImmutableMap.<String, Object>of(),
null,
null
null,
rowIngestionMetersFactory
);
}
@ -2096,10 +2103,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
KafkaIndexTaskClientFactory taskClientFactory,
ObjectMapper mapper,
KafkaSupervisorSpec spec
KafkaSupervisorSpec spec,
RowIngestionMetersFactory rowIngestionMetersFactory
)
{
super(taskStorage, taskMaster, indexerMetadataStorageCoordinator, taskClientFactory, mapper, spec);
super(
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
mapper,
spec,
rowIngestionMetersFactory
);
}
@Override

View File

@ -21,6 +21,8 @@ package io.druid.indexing.common;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import io.druid.indexing.common.task.Task;
import io.druid.query.DruidMetrics;
import io.druid.segment.realtime.FireDepartment;
@ -40,4 +42,16 @@ public class TaskRealtimeMetricsMonitorBuilder
)
);
}
public static TaskRealtimeMetricsMonitor build(Task task, FireDepartment fireDepartment, RowIngestionMeters meters)
{
return new TaskRealtimeMetricsMonitor(
fireDepartment,
meters,
ImmutableMap.of(
DruidMetrics.TASK_ID, new String[]{task.getId()},
DruidMetrics.TASK_TYPE, new String[]{task.getType()}
)
);
}
}

View File

@ -52,8 +52,6 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return Files.createTempDir();
}
private final int maxRowsInMemory;
private final int maxRowsPerSegment;
private final long maxBytesInMemory;

View File

@ -0,0 +1,137 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.stats;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import java.util.Map;
public class DropwizardRowIngestionMeters implements RowIngestionMeters
{
public static final String ONE_MINUTE_NAME = "1m";
public static final String FIVE_MINUTE_NAME = "5m";
public static final String FIFTEEN_MINUTE_NAME = "15m";
private final MetricRegistry metricRegistry;
private final Meter processed;
private final Meter processedWithError;
private final Meter unparseable;
private final Meter thrownAway;
public DropwizardRowIngestionMeters()
{
this.metricRegistry = new MetricRegistry();
this.processed = metricRegistry.meter(PROCESSED);
this.processedWithError = metricRegistry.meter(PROCESSED_WITH_ERROR);
this.unparseable = metricRegistry.meter(UNPARSEABLE);
this.thrownAway = metricRegistry.meter(THROWN_AWAY);
}
@Override
public long getProcessed()
{
return processed.getCount();
}
@Override
public void incrementProcessed()
{
processed.mark();
}
@Override
public long getProcessedWithError()
{
return processedWithError.getCount();
}
@Override
public void incrementProcessedWithError()
{
processedWithError.mark();
}
@Override
public long getUnparseable()
{
return unparseable.getCount();
}
@Override
public void incrementUnparseable()
{
unparseable.mark();
}
@Override
public long getThrownAway()
{
return thrownAway.getCount();
}
@Override
public void incrementThrownAway()
{
thrownAway.mark();
}
@Override
public RowIngestionMetersTotals getTotals()
{
return new RowIngestionMetersTotals(
processed.getCount(),
processedWithError.getCount(),
thrownAway.getCount(),
unparseable.getCount()
);
}
@Override
public Map<String, Object> getMovingAverages()
{
Map<String, Object> movingAverages = Maps.newHashMap();
Map<String, Object> oneMinute = Maps.newHashMap();
oneMinute.put(PROCESSED, processed.getOneMinuteRate());
oneMinute.put(PROCESSED_WITH_ERROR, processedWithError.getOneMinuteRate());
oneMinute.put(UNPARSEABLE, unparseable.getOneMinuteRate());
oneMinute.put(THROWN_AWAY, thrownAway.getOneMinuteRate());
Map<String, Object> fiveMinute = Maps.newHashMap();
fiveMinute.put(PROCESSED, processed.getFiveMinuteRate());
fiveMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFiveMinuteRate());
fiveMinute.put(UNPARSEABLE, unparseable.getFiveMinuteRate());
fiveMinute.put(THROWN_AWAY, thrownAway.getFiveMinuteRate());
Map<String, Object> fifteenMinute = Maps.newHashMap();
fifteenMinute.put(PROCESSED, processed.getFifteenMinuteRate());
fifteenMinute.put(PROCESSED_WITH_ERROR, processedWithError.getFifteenMinuteRate());
fifteenMinute.put(UNPARSEABLE, unparseable.getFifteenMinuteRate());
fifteenMinute.put(THROWN_AWAY, thrownAway.getFifteenMinuteRate());
movingAverages.put(ONE_MINUTE_NAME, oneMinute);
movingAverages.put(FIVE_MINUTE_NAME, fiveMinute);
movingAverages.put(FIFTEEN_MINUTE_NAME, fifteenMinute);
return movingAverages;
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.stats;
public class DropwizardRowIngestionMetersFactory implements RowIngestionMetersFactory
{
@Override
public RowIngestionMeters createRowIngestionMeters()
{
return new DropwizardRowIngestionMeters();
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.stats;
import io.druid.guice.annotations.ExtensionPoint;
import java.util.Map;
/**
* A collection of meters for row ingestion stats, with support for moving average calculations.
* This can eventually replace FireDepartmentMetrics, but moving averages for other stats collected by
* FireDepartmentMetrics are not currently supported, so we continue to use FireDepartmentMetrics alongside
* RowIngestionMeters to avoid unnecessary overhead from maintaining these moving averages.
*/
@ExtensionPoint
public interface RowIngestionMeters
{
String BUILD_SEGMENTS = "buildSegments";
String DETERMINE_PARTITIONS = "determinePartitions";
String PROCESSED = "processed";
String PROCESSED_WITH_ERROR = "processedWithError";
String UNPARSEABLE = "unparseable";
String THROWN_AWAY = "thrownAway";
long getProcessed();
void incrementProcessed();
long getProcessedWithError();
void incrementProcessedWithError();
long getUnparseable();
void incrementUnparseable();
long getThrownAway();
void incrementThrownAway();
RowIngestionMetersTotals getTotals();
Map<String, Object> getMovingAverages();
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.stats;
public interface RowIngestionMetersFactory
{
RowIngestionMeters createRowIngestionMeters();
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.stats;
import com.amazonaws.thirdparty.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class RowIngestionMetersTotals
{
private final long processed;
private final long processedWithError;
private final long thrownAway;
private final long unparseable;
@JsonCreator
public RowIngestionMetersTotals(
@JsonProperty("processed") long processed,
@JsonProperty("processedWithError") long processedWithError,
@JsonProperty("thrownAway") long thrownAway,
@JsonProperty("unparseable") long unparseable
)
{
this.processed = processed;
this.processedWithError = processedWithError;
this.thrownAway = thrownAway;
this.unparseable = unparseable;
}
@JsonProperty
public long getProcessed()
{
return processed;
}
@JsonProperty
public long getProcessedWithError()
{
return processedWithError;
}
@JsonProperty
public long getThrownAway()
{
return thrownAway;
}
@JsonProperty
public long getUnparseable()
{
return unparseable;
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.stats;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.java.util.emitter.service.ServiceMetricEvent;
import io.druid.java.util.metrics.AbstractMonitor;
import io.druid.java.util.metrics.MonitorUtils;
import io.druid.query.DruidMetrics;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import java.util.Map;
/**
* Replaces the old RealtimeMetricsMonitor for indexing tasks that use a single FireDepartment, with changes to
* read row ingestion stats from RowIngestionMeters (which supports moving averages) instead of FireDepartmentMetrics.
* See comment on RowIngestionMeters for more information regarding relationship between RowIngestionMeters and
* FireDepartmentMetrics.
*/
public class TaskRealtimeMetricsMonitor extends AbstractMonitor
{
private static final EmittingLogger log = new EmittingLogger(TaskRealtimeMetricsMonitor.class);
private final FireDepartment fireDepartment;
private final RowIngestionMeters rowIngestionMeters;
private final Map<String, String[]> dimensions;
private FireDepartmentMetrics previousFireDepartmentMetrics;
private RowIngestionMetersTotals previousRowIngestionMetersTotals;
public TaskRealtimeMetricsMonitor(
FireDepartment fireDepartment,
RowIngestionMeters rowIngestionMeters,
Map<String, String[]> dimensions
)
{
this.fireDepartment = fireDepartment;
this.rowIngestionMeters = rowIngestionMeters;
this.dimensions = ImmutableMap.copyOf(dimensions);
previousFireDepartmentMetrics = new FireDepartmentMetrics();
previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 0);
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
FireDepartmentMetrics metrics = fireDepartment.getMetrics().snapshot();
RowIngestionMetersTotals rowIngestionMetersTotals = rowIngestionMeters.getTotals();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, fireDepartment.getDataSchema().getDataSource());
MonitorUtils.addDimensionsToBuilder(builder, dimensions);
final long thrownAway = rowIngestionMetersTotals.getThrownAway() - previousRowIngestionMetersTotals.getThrownAway();
if (thrownAway > 0) {
log.warn("[%,d] events thrown away because they are outside the window period!", thrownAway);
}
emitter.emit(builder.build("ingest/events/thrownAway", thrownAway));
final long unparseable = rowIngestionMetersTotals.getUnparseable() - previousRowIngestionMetersTotals.getUnparseable();
if (unparseable > 0) {
log.error("[%,d] Unparseable events! Turn on debug logging to see exception stack trace.", unparseable);
}
emitter.emit(builder.build("ingest/events/unparseable", unparseable));
final long processedWithError = rowIngestionMetersTotals.getProcessedWithError() - previousRowIngestionMetersTotals.getProcessedWithError();
if (processedWithError > 0) {
log.error("[%,d] events processed with errors! Set logParseExceptions to true in the ingestion spec to log these errors.", processedWithError);
}
emitter.emit(builder.build("ingest/events/processedWithError", processedWithError));
emitter.emit(builder.build("ingest/events/processed", rowIngestionMetersTotals.getProcessed() - previousRowIngestionMetersTotals.getProcessed()));
final long dedup = metrics.dedup() - previousFireDepartmentMetrics.dedup();
if (dedup > 0) {
log.warn("[%,d] duplicate events!", dedup);
}
emitter.emit(builder.build("ingest/events/duplicate", dedup));
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previousFireDepartmentMetrics.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previousFireDepartmentMetrics.numPersists()));
emitter.emit(builder.build("ingest/persists/time", metrics.persistTimeMillis() - previousFireDepartmentMetrics.persistTimeMillis()));
emitter.emit(builder.build("ingest/persists/cpu", metrics.persistCpuTime() - previousFireDepartmentMetrics.persistCpuTime()));
emitter.emit(
builder.build(
"ingest/persists/backPressure",
metrics.persistBackPressureMillis() - previousFireDepartmentMetrics.persistBackPressureMillis()
)
);
emitter.emit(builder.build("ingest/persists/failed", metrics.failedPersists() - previousFireDepartmentMetrics.failedPersists()));
emitter.emit(builder.build("ingest/handoff/failed", metrics.failedHandoffs() - previousFireDepartmentMetrics.failedHandoffs()));
emitter.emit(builder.build("ingest/merge/time", metrics.mergeTimeMillis() - previousFireDepartmentMetrics.mergeTimeMillis()));
emitter.emit(builder.build("ingest/merge/cpu", metrics.mergeCpuTime() - previousFireDepartmentMetrics.mergeCpuTime()));
emitter.emit(builder.build("ingest/handoff/count", metrics.handOffCount() - previousFireDepartmentMetrics.handOffCount()));
emitter.emit(builder.build("ingest/sink/count", metrics.sinkCount()));
emitter.emit(builder.build("ingest/events/messageGap", metrics.messageGap()));
previousRowIngestionMetersTotals = rowIngestionMetersTotals;
previousFireDepartmentMetrics = metrics;
return true;
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
@ -38,7 +39,6 @@ import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskMetricsGetter;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@ -51,6 +51,9 @@ import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
@ -65,8 +68,6 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
@ -143,7 +144,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
private volatile FireDepartmentMetrics metrics = null;
@JsonIgnore
private TaskMetricsGetter metricsGetter;
private final RowIngestionMeters rowIngestionMeters;
@JsonIgnore
private volatile boolean gracefullyStopped = false;
@ -176,7 +177,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
@JsonProperty("spec") RealtimeAppenderatorIngestionSpec spec,
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject AuthorizerMapper authorizerMapper
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
)
{
super(
@ -196,6 +198,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
}
this.ingestionState = IngestionState.NOT_STARTED;
this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
}
@Override
@ -248,10 +251,13 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
dataSchema, new RealtimeIOConfig(null, null, null), null
);
final RealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics);
final TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(
this,
fireDepartmentForMetrics,
rowIngestionMeters
);
this.metrics = fireDepartmentForMetrics.getMetrics();
metricsGetter = new FireDepartmentMetricsTaskMetricsGetter(metrics);
Supplier<Committer> committerSupplier = null;
final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
@ -269,6 +275,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
log.warn("No chat handler detected");
}
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@ -309,7 +316,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
if (inputRow == null) {
log.debug("Discarded null row, considering thrownAway.");
metrics.incrementThrownAway();
rowIngestionMeters.incrementThrownAway();
} else {
AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);
@ -330,7 +337,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
if (addResult.getParseException() != null) {
handleParseException(addResult.getParseException());
} else {
metrics.incrementProcessed();
rowIngestionMeters.incrementProcessed();
}
}
}
@ -431,6 +438,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
* Public for tests.
*/
@JsonIgnore
@VisibleForTesting
public Firehose getFirehose()
{
return firehose;
@ -440,11 +448,19 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
* Public for tests.
*/
@JsonIgnore
@VisibleForTesting
public FireDepartmentMetrics getMetrics()
{
return metrics;
}
@JsonIgnore
@VisibleForTesting
public RowIngestionMeters getRowIngestionMeters()
{
return rowIngestionMeters;
}
@JsonProperty("spec")
public RealtimeAppenderatorIngestionSpec getSpec()
{
@ -462,14 +478,18 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, Object> returnMap = Maps.newHashMap();
Map<String, Object> totalsMap = Maps.newHashMap();
Map<String, Object> averagesMap = Maps.newHashMap();
if (metricsGetter != null) {
totalsMap.put(
"buildSegments",
metricsGetter.getTotalMetrics()
);
}
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
);
averagesMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getMovingAverages()
);
returnMap.put("movingAverages", averagesMap);
returnMap.put("totals", totalsMap);
return Response.ok(returnMap).build();
}
@ -523,7 +543,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
Map<String, Object> unparseableEventsMap = Maps.newHashMap();
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
if (buildSegmentsParseExceptionMessages != null) {
unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages);
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
}
return unparseableEventsMap;
}
@ -531,21 +551,19 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
private Map<String, Object> getTaskCompletionRowStats()
{
Map<String, Object> metricsMap = Maps.newHashMap();
if (metricsGetter != null) {
metricsMap.put(
"buildSegments",
metricsGetter.getTotalMetrics()
);
}
metricsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
rowIngestionMeters.getTotals()
);
return metricsMap;
}
private void handleParseException(ParseException pe)
{
if (pe.isFromPartiallyValidRow()) {
metrics.incrementProcessedWithErrors();
rowIngestionMeters.incrementProcessedWithError();
} else {
metrics.incrementUnparseable();
rowIngestionMeters.incrementUnparseable();
}
if (spec.getTuningConfig().isLogParseExceptions()) {
@ -556,7 +574,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
savedParseExceptions.add(pe);
}
if (metrics.unparseable() + metrics.processedWithErrors()
if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
> spec.getTuningConfig().getMaxParseExceptions()) {
log.error("Max parse exceptions exceeded, terminating task...");
throw new RuntimeException("Max parse exceptions exceeded, terminating task...");

View File

@ -44,6 +44,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@ -66,6 +67,7 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.security.AuthorizerMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
@ -108,6 +110,12 @@ public class CompactionTask extends AbstractTask
@JsonIgnore
private final AuthorizerMapper authorizerMapper;
@JsonIgnore
private final ChatHandlerProvider chatHandlerProvider;
@JsonIgnore
private final RowIngestionMetersFactory rowIngestionMetersFactory;
@JsonCreator
public CompactionTask(
@JsonProperty("id") final String id,
@ -119,7 +127,9 @@ public class CompactionTask extends AbstractTask
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@JacksonInject AuthorizerMapper authorizerMapper
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
)
{
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
@ -133,6 +143,8 @@ public class CompactionTask extends AbstractTask
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
}
@JsonProperty
@ -206,7 +218,8 @@ public class CompactionTask extends AbstractTask
ingestionSpec,
getContext(),
authorizerMapper,
null
chatHandlerProvider,
rowIngestionMetersFactory
);
}
}

View File

@ -49,6 +49,7 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.JodaUtils;
@ -424,11 +425,11 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
Map<String, Object> totalsMap = Maps.newHashMap();
if (determinePartitionsStatsGetter != null) {
totalsMap.put("determinePartitions", determinePartitionsStatsGetter.getTotalMetrics());
totalsMap.put(RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsStatsGetter.getTotalMetrics());
}
if (buildSegmentsStatsGetter != null) {
totalsMap.put("buildSegments", buildSegmentsStatsGetter.getTotalMetrics());
totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsStatsGetter.getTotalMetrics());
}
returnMap.put("totals", totalsMap);
@ -455,13 +456,13 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
Map<String, Object> metrics = Maps.newHashMap();
if (determineConfigStatus != null) {
metrics.put(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
determineConfigStatus.getMetrics()
);
}
if (buildSegmentsStatus != null) {
metrics.put(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsStatus.getMetrics()
);
}

View File

@ -41,7 +41,6 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.hll.HyperLogLogCollector;
import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskMetricsGetter;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@ -53,6 +52,9 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import io.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
@ -71,8 +73,6 @@ import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
@ -164,24 +164,21 @@ public class IndexTask extends AbstractTask implements ChatHandler
@JsonIgnore
private FireDepartmentMetrics buildSegmentsFireDepartmentMetrics;
@JsonIgnore
private TaskMetricsGetter buildSegmentsMetricsGetter;
@JsonIgnore
private CircularBuffer<Throwable> buildSegmentsSavedParseExceptions;
@JsonIgnore
private FireDepartmentMetrics determinePartitionsFireDepartmentMetrics;
@JsonIgnore
private TaskMetricsGetter determinePartitionsMetricsGetter;
@JsonIgnore
private CircularBuffer<Throwable> determinePartitionsSavedParseExceptions;
@JsonIgnore
private String errorMsg;
@JsonIgnore
private final RowIngestionMeters determinePartitionsMeters;
@JsonIgnore
private final RowIngestionMeters buildSegmentsMeters;
@JsonCreator
public IndexTask(
@JsonProperty("id") final String id,
@ -189,7 +186,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
@JsonProperty("spec") final IndexIngestionSpec ingestionSchema,
@JsonProperty("context") final Map<String, Object> context,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject ChatHandlerProvider chatHandlerProvider
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
)
{
this(
@ -200,7 +198,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
ingestionSchema,
context,
authorizerMapper,
chatHandlerProvider
chatHandlerProvider,
rowIngestionMetersFactory
);
}
@ -212,7 +211,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
IndexIngestionSpec ingestionSchema,
Map<String, Object> context,
AuthorizerMapper authorizerMapper,
ChatHandlerProvider chatHandlerProvider
ChatHandlerProvider chatHandlerProvider,
RowIngestionMetersFactory rowIngestionMetersFactory
)
{
super(
@ -235,6 +235,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
);
}
this.ingestionState = IngestionState.NOT_STARTED;
this.determinePartitionsMeters = rowIngestionMetersFactory.createRowIngestionMeters();
this.buildSegmentsMeters = rowIngestionMetersFactory.createRowIngestionMeters();
}
@Override
@ -310,14 +312,14 @@ public class IndexTask extends AbstractTask implements ChatHandler
if (needsDeterminePartitions) {
events.put(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
IndexTaskUtils.getMessagesFromSavedParseExceptions(determinePartitionsSavedParseExceptions)
);
}
if (needsBuildSegments) {
events.put(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
IndexTaskUtils.getMessagesFromSavedParseExceptions(buildSegmentsSavedParseExceptions)
);
}
@ -336,6 +338,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
IndexTaskUtils.datasourceAuthorizationCheck(req, Action.READ, getDataSource(), authorizerMapper);
Map<String, Object> returnMap = Maps.newHashMap();
Map<String, Object> totalsMap = Maps.newHashMap();
Map<String, Object> averagesMap = Maps.newHashMap();
boolean needsDeterminePartitions = false;
boolean needsBuildSegments = false;
@ -358,24 +361,29 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
if (needsDeterminePartitions) {
if (determinePartitionsMetricsGetter != null) {
totalsMap.put(
"determinePartitions",
determinePartitionsMetricsGetter.getTotalMetrics()
);
}
totalsMap.put(
RowIngestionMeters.DETERMINE_PARTITIONS,
determinePartitionsMeters.getTotals()
);
averagesMap.put(
RowIngestionMeters.DETERMINE_PARTITIONS,
determinePartitionsMeters.getMovingAverages()
);
}
if (needsBuildSegments) {
if (buildSegmentsMetricsGetter != null) {
totalsMap.put(
"buildSegments",
buildSegmentsMetricsGetter.getTotalMetrics()
);
}
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsMeters.getTotals()
);
averagesMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsMeters.getMovingAverages()
);
}
returnMap.put("totals", totalsMap);
returnMap.put("movingAverages", averagesMap);
return Response.ok(returnMap).build();
}
@ -486,8 +494,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
buildSegmentsSavedParseExceptions);
if (determinePartitionsParseExceptionMessages != null || buildSegmentsParseExceptionMessages != null) {
unparseableEventsMap.put("determinePartitions", determinePartitionsParseExceptionMessages);
unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages);
unparseableEventsMap.put(RowIngestionMeters.DETERMINE_PARTITIONS, determinePartitionsParseExceptionMessages);
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
}
return unparseableEventsMap;
@ -496,18 +504,16 @@ public class IndexTask extends AbstractTask implements ChatHandler
private Map<String, Object> getTaskCompletionRowStats()
{
Map<String, Object> metrics = Maps.newHashMap();
if (determinePartitionsMetricsGetter != null) {
metrics.put(
"determinePartitions",
determinePartitionsMetricsGetter.getTotalMetrics()
);
}
if (buildSegmentsMetricsGetter != null) {
metrics.put(
"buildSegments",
buildSegmentsMetricsGetter.getTotalMetrics()
);
}
metrics.put(
RowIngestionMeters.DETERMINE_PARTITIONS,
determinePartitionsMeters.getTotals()
);
metrics.put(
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsMeters.getTotals()
);
return metrics;
}
@ -699,11 +705,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
boolean determineNumPartitions
) throws IOException
{
determinePartitionsFireDepartmentMetrics = new FireDepartmentMetrics();
determinePartitionsMetricsGetter = new FireDepartmentMetricsTaskMetricsGetter(
determinePartitionsFireDepartmentMetrics
);
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
@ -721,7 +722,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
// The null inputRow means the caller must skip this row.
if (inputRow == null) {
determinePartitionsFireDepartmentMetrics.incrementThrownAway();
determinePartitionsMeters.incrementThrownAway();
continue;
}
@ -739,7 +740,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
if (!optInterval.isPresent()) {
determinePartitionsFireDepartmentMetrics.incrementThrownAway();
determinePartitionsMeters.incrementThrownAway();
continue;
}
interval = optInterval.get();
@ -763,7 +764,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
hllCollectors.put(interval, Optional.absent());
}
}
determinePartitionsFireDepartmentMetrics.incrementProcessed();
determinePartitionsMeters.incrementProcessed();
}
catch (ParseException e) {
if (ingestionSchema.getTuningConfig().isLogParseExceptions()) {
@ -774,8 +775,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
determinePartitionsSavedParseExceptions.add(e);
}
determinePartitionsFireDepartmentMetrics.incrementUnparseable();
if (determinePartitionsFireDepartmentMetrics.unparseable() > ingestionSchema.getTuningConfig()
determinePartitionsMeters.incrementUnparseable();
if (determinePartitionsMeters.getUnparseable() > ingestionSchema.getTuningConfig()
.getMaxParseExceptions()) {
throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
}
@ -839,10 +840,13 @@ public class IndexTask extends AbstractTask implements ChatHandler
dataSchema, new RealtimeIOConfig(null, null, null), null
);
buildSegmentsFireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
buildSegmentsMetricsGetter = new FireDepartmentMetricsTaskMetricsGetter(buildSegmentsFireDepartmentMetrics);
if (toolbox.getMonitorScheduler() != null) {
final RealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(this, fireDepartmentForMetrics);
final TaskRealtimeMetricsMonitor metricsMonitor = TaskRealtimeMetricsMonitorBuilder.build(
this,
fireDepartmentForMetrics,
buildSegmentsMeters
);
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
}
@ -924,7 +928,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
final InputRow inputRow = firehose.nextRow();
if (inputRow == null) {
buildSegmentsFireDepartmentMetrics.incrementThrownAway();
buildSegmentsMeters.incrementThrownAway();
continue;
}
@ -938,7 +942,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
if (!optInterval.isPresent()) {
buildSegmentsFireDepartmentMetrics.incrementThrownAway();
buildSegmentsMeters.incrementThrownAway();
continue;
}
@ -974,7 +978,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
if (addResult.getParseException() != null) {
handleParseException(addResult.getParseException());
} else {
buildSegmentsFireDepartmentMetrics.incrementProcessed();
buildSegmentsMeters.incrementProcessed();
}
}
catch (ParseException e) {
@ -1002,9 +1006,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
} else {
log.info(
"Processed[%,d] events, unparseable[%,d], thrownAway[%,d].",
buildSegmentsFireDepartmentMetrics.processed(),
buildSegmentsFireDepartmentMetrics.unparseable(),
buildSegmentsFireDepartmentMetrics.thrownAway()
buildSegmentsMeters.getProcessed(),
buildSegmentsMeters.getUnparseable(),
buildSegmentsMeters.getThrownAway()
);
log.info(
"Published segments[%s]", Joiner.on(", ").join(
@ -1027,9 +1031,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
private void handleParseException(ParseException e)
{
if (e.isFromPartiallyValidRow()) {
buildSegmentsFireDepartmentMetrics.incrementProcessedWithErrors();
buildSegmentsMeters.incrementProcessedWithError();
} else {
buildSegmentsFireDepartmentMetrics.incrementUnparseable();
buildSegmentsMeters.incrementUnparseable();
}
if (ingestionSchema.tuningConfig.isLogParseExceptions()) {
@ -1040,8 +1044,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
buildSegmentsSavedParseExceptions.add(e);
}
if (buildSegmentsFireDepartmentMetrics.unparseable()
+ buildSegmentsFireDepartmentMetrics.processedWithErrors() > ingestionSchema.tuningConfig.getMaxParseExceptions()) {
if (buildSegmentsMeters.getUnparseable()
+ buildSegmentsMeters.getProcessedWithError() > ingestionSchema.tuningConfig.getMaxParseExceptions()) {
log.error("Max parse exceptions exceeded, terminating task...");
throw new RuntimeException("Max parse exceptions exceeded, terminating task...", e);
}

View File

@ -142,6 +142,12 @@ public class SupervisorManager
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStatus());
}
public Optional<Map<String, Map<String, Object>>> getSupervisorStats(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStats());
}
public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata)
{
Preconditions.checkState(started, "SupervisorManager not started");

View File

@ -188,6 +188,39 @@ public class SupervisorResource
);
}
@GET
@Path("/{id}/stats")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response getAllTaskStats(
@PathParam("id") final String id
)
{
return asLeaderWithSupervisorManager(
new Function<SupervisorManager, Response>()
{
@Override
public Response apply(SupervisorManager manager)
{
Optional<Map<String, Map<String, Object>>> stats = manager.getSupervisorStats(id);
if (!stats.isPresent()) {
return Response.status(Response.Status.NOT_FOUND)
.entity(
ImmutableMap.of(
"error",
StringUtils.format("[%s] does not exist", id)
)
)
.build();
}
return Response.ok(stats.get()).build();
}
}
);
}
@POST
@Path("/{id}/shutdown")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import io.druid.guice.ServerModule;
import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
@ -51,6 +53,7 @@ public class TestUtils
private final ObjectMapper jsonMapper;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
public TestUtils()
{
@ -74,6 +77,8 @@ public class TestUtils
jsonMapper.registerModule(module);
}
this.rowIngestionMetersFactory = new DropwizardRowIngestionMetersFactory();
jsonMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), LookupEnabledTestExprMacroTable.INSTANCE)
@ -82,6 +87,7 @@ public class TestUtils
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
.addValue(AuthConfig.class, new AuthConfig())
.addValue(AuthorizerMapper.class, null)
.addValue(RowIngestionMetersFactory.class, rowIngestionMetersFactory)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
);
}
@ -101,6 +107,11 @@ public class TestUtils
return indexIO;
}
public RowIngestionMetersFactory getRowIngestionMetersFactory()
{
return rowIngestionMetersFactory;
}
public static boolean conditionValid(IndexingServiceCondition condition)
{
return conditionValid(condition, 1000);

View File

@ -46,7 +46,6 @@ import io.druid.discovery.DataNodeService;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.LookupNodeService;
import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskMetricsUtils;
import io.druid.indexer.TaskState;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.SegmentLoaderFactory;
@ -63,6 +62,8 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
import io.druid.indexing.common.index.RealtimeAppenderatorTuningConfig;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.SegmentPublishResult;
@ -269,6 +270,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
private TaskToolboxFactory taskToolboxFactory;
private File baseDir;
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
@Before
public void setUp() throws IOException
@ -360,9 +362,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest
Collection<DataSegment> publishedSegments = awaitSegments();
// Check metrics.
Assert.assertEquals(2, task.getMetrics().processed());
Assert.assertEquals(0, task.getMetrics().thrownAway());
Assert.assertEquals(0, task.getMetrics().unparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
// Do some queries.
Assert.assertEquals(2, sumMetric(task, null, "rows"));
@ -422,9 +424,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest
Collection<DataSegment> publishedSegments = awaitSegments();
// Check metrics.
Assert.assertEquals(2, task.getMetrics().processed());
Assert.assertEquals(0, task.getMetrics().thrownAway());
Assert.assertEquals(0, task.getMetrics().unparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
// Do some queries.
Assert.assertEquals(2, sumMetric(task, null, "rows"));
@ -487,9 +489,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest
Collection<DataSegment> publishedSegments = awaitSegments();
// Check metrics.
Assert.assertEquals(2000, task.getMetrics().processed());
Assert.assertEquals(0, task.getMetrics().thrownAway());
Assert.assertEquals(0, task.getMetrics().unparseable());
Assert.assertEquals(2000, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
// Do some queries.
Assert.assertEquals(2000, sumMetric(task, null, "rows"));
@ -555,9 +557,9 @@ public class AppenderatorDriverRealtimeIndexTaskTest
Collection<DataSegment> publishedSegments = awaitSegments();
// Check metrics.
Assert.assertEquals(2, task.getMetrics().processed());
Assert.assertEquals(1, task.getMetrics().thrownAway());
Assert.assertEquals(0, task.getMetrics().unparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(1, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(0, task.getRowIngestionMeters().getUnparseable());
// Do some queries.
Assert.assertEquals(2, sumMetric(task, null, "rows"));
@ -623,7 +625,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Found unparseable columns in row: [MapBasedInputRow{timestamp=1970-01-01T00:50:00.000Z, event={t=3000000, dim1=foo, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [Unable to parse value[foo] for field[met1],]"
)
@ -675,10 +677,10 @@ public class AppenderatorDriverRealtimeIndexTaskTest
DataSegment publishedSegment = Iterables.getOnlyElement(publishedSegments);
// Check metrics.
Assert.assertEquals(2, task.getMetrics().processed());
Assert.assertEquals(1, task.getMetrics().processedWithErrors());
Assert.assertEquals(0, task.getMetrics().thrownAway());
Assert.assertEquals(2, task.getMetrics().unparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(1, task.getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(2, task.getRowIngestionMeters().getUnparseable());
// Do some queries.
Assert.assertEquals(3, sumMetric(task, null, "rows"));
@ -702,12 +704,12 @@ public class AppenderatorDriverRealtimeIndexTaskTest
handOffCallbacks.clear();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 2,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 1,
TaskMetricsUtils.ROWS_UNPARSEABLE, 2,
TaskMetricsUtils.ROWS_THROWN_AWAY, 0
RowIngestionMeters.PROCESSED, 2,
RowIngestionMeters.PROCESSED_WITH_ERROR, 1,
RowIngestionMeters.UNPARSEABLE, 2,
RowIngestionMeters.THROWN_AWAY, 0
)
);
@ -767,10 +769,10 @@ public class AppenderatorDriverRealtimeIndexTaskTest
DataSegment publishedSegment = Iterables.getOnlyElement(publishedSegments);
// Check metrics.
Assert.assertEquals(2, task.getMetrics().processed());
Assert.assertEquals(2, task.getMetrics().processedWithErrors());
Assert.assertEquals(0, task.getMetrics().thrownAway());
Assert.assertEquals(2, task.getMetrics().unparseable());
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessed());
Assert.assertEquals(2, task.getRowIngestionMeters().getProcessedWithError());
Assert.assertEquals(0, task.getRowIngestionMeters().getThrownAway());
Assert.assertEquals(2, task.getRowIngestionMeters().getUnparseable());
// Do some queries.
Assert.assertEquals(4, sumMetric(task, null, "rows"));
@ -794,12 +796,12 @@ public class AppenderatorDriverRealtimeIndexTaskTest
handOffCallbacks.clear();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 2,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 2,
TaskMetricsUtils.ROWS_UNPARSEABLE, 2,
TaskMetricsUtils.ROWS_THROWN_AWAY, 0
RowIngestionMeters.PROCESSED, 2,
RowIngestionMeters.PROCESSED_WITH_ERROR, 2,
RowIngestionMeters.UNPARSEABLE, 2,
RowIngestionMeters.THROWN_AWAY, 0
)
);
@ -811,7 +813,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]",
@ -871,17 +873,17 @@ public class AppenderatorDriverRealtimeIndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 1,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 2,
TaskMetricsUtils.ROWS_UNPARSEABLE, 2,
TaskMetricsUtils.ROWS_THROWN_AWAY, 0
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.PROCESSED_WITH_ERROR, 2,
RowIngestionMeters.UNPARSEABLE, 2,
RowIngestionMeters.THROWN_AWAY, 0
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unparseable timestamp found! Event: {dim1=foo, met1=2.0, __fail__=x}",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2018-03-17T01:59:20.729Z, event={t=1521251960729, dim1=foo, dimLong=notnumber, dimFloat=notnumber, met1=foo}, dimensions=[dim1, dim2, dim1t, dimLong, dimFloat]}], exceptions: [could not convert value [notnumber] to long,could not convert value [notnumber] to float,Unable to parse value[foo] for field[met1],]",
@ -1123,17 +1125,17 @@ public class AppenderatorDriverRealtimeIndexTaskTest
// Wait for the task to finish.
TaskStatus status = statusFuture.get();
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 0,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0,
TaskMetricsUtils.ROWS_UNPARSEABLE, 0,
TaskMetricsUtils.ROWS_THROWN_AWAY, 0
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.UNPARSEABLE, 0,
RowIngestionMeters.THROWN_AWAY, 0
)
);
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Assert.assertTrue(status.getErrorMsg().contains("java.lang.IllegalArgumentException\n\tat java.nio.Buffer.position"));
}
@ -1261,7 +1263,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest
new RealtimeAppenderatorIngestionSpec(dataSchema, realtimeIOConfig, tuningConfig),
null,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
rowIngestionMetersFactory
)
{
@Override
@ -1423,6 +1426,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
}
};
final TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override

View File

@ -44,9 +44,11 @@ import io.druid.guice.GuiceAnnotationIntrospector;
import io.druid.guice.GuiceInjectableValues;
import io.druid.guice.GuiceInjectors;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.CompactionTask.SegmentProvider;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
@ -80,6 +82,8 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import io.druid.segment.transform.TransformingInputRowParser;
import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import io.druid.server.security.AuthTestUtils;
@ -130,6 +134,7 @@ public class CompactionTaskTest
private static Map<String, DimensionSchema> DIMENSIONS;
private static Map<String, AggregatorFactory> AGGREGATORS;
private static List<DataSegment> SEGMENTS;
private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static Map<DataSegment, File> segmentMap;
@ -211,6 +216,8 @@ public class CompactionTaskTest
public void configure(Binder binder)
{
binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
}
}
)
@ -294,7 +301,9 @@ public class CompactionTaskTest
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
final byte[] bytes = objectMapper.writeValueAsBytes(task);
final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
@ -321,7 +330,9 @@ public class CompactionTaskTest
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null,
rowIngestionMetersFactory
);
final byte[] bytes = objectMapper.writeValueAsBytes(task);
final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);

View File

@ -36,7 +36,6 @@ import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.TaskMetricsUtils;
import io.druid.indexer.TaskState;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock;
@ -53,6 +52,8 @@ import io.druid.indexing.common.actions.SegmentAllocateAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.stats.RowIngestionMeters;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.indexing.overlord.SegmentPublishResult;
@ -137,6 +138,7 @@ public class IndexTaskTest
private IndexIO indexIO;
private volatile int segmentAllocatePartitionCounter;
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
public IndexTaskTest()
{
@ -144,6 +146,7 @@ public class IndexTaskTest
jsonMapper = testUtils.getTestObjectMapper();
indexMergerV9 = testUtils.getTestIndexMergerV9();
indexIO = testUtils.getTestIndexIO();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
}
@Before
@ -183,7 +186,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -228,7 +232,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
@ -279,7 +284,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
@ -322,7 +328,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
List<DataSegment> segments = runTask(indexTask).rhs;
@ -358,7 +365,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -390,7 +398,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -428,7 +437,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
Assert.assertEquals("index_append_test", indexTask.getGroupId());
@ -477,7 +487,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -539,7 +550,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -590,7 +602,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -636,7 +649,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -680,7 +694,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -723,7 +738,8 @@ public class IndexTaskTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -800,7 +816,8 @@ public class IndexTaskTest
parseExceptionIgnoreSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -852,7 +869,8 @@ public class IndexTaskTest
parseExceptionIgnoreSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
TaskStatus status = runTask(indexTask).lhs;
@ -860,9 +878,9 @@ public class IndexTaskTest
checkTaskStatusErrorMsgForParseExceptionsExceeded(status);
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
new ArrayList<>(),
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList("Unparseable timestamp found! Event: {time=unparseable, d=a, val=1}")
);
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
@ -940,7 +958,8 @@ public class IndexTaskTest
parseExceptionIgnoreSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
TaskStatus status = runTask(indexTask).lhs;
@ -950,32 +969,33 @@ public class IndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0,
TaskMetricsUtils.ROWS_PROCESSED, 4,
TaskMetricsUtils.ROWS_UNPARSEABLE, 4,
TaskMetricsUtils.ROWS_THROWN_AWAY, 1
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 4,
RowIngestionMeters.UNPARSEABLE, 4,
RowIngestionMeters.THROWN_AWAY, 1
),
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 3,
TaskMetricsUtils.ROWS_PROCESSED, 1,
TaskMetricsUtils.ROWS_UNPARSEABLE, 4,
TaskMetricsUtils.ROWS_THROWN_AWAY, 1
RowIngestionMeters.PROCESSED_WITH_ERROR, 3,
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.UNPARSEABLE, 4,
RowIngestionMeters.THROWN_AWAY, 1
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
Arrays.asList(
"Unable to parse row [this is not JSON]",
"Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
"Unable to parse row [{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
"Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
),
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unable to parse row [this is not JSON]",
"Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
@ -1061,7 +1081,8 @@ public class IndexTaskTest
parseExceptionIgnoreSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
TaskStatus status = runTask(indexTask).lhs;
@ -1071,21 +1092,28 @@ public class IndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
RowIngestionMeters.DETERMINE_PARTITIONS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0,
TaskMetricsUtils.ROWS_PROCESSED, 1,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3,
TaskMetricsUtils.ROWS_THROWN_AWAY, 2
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.UNPARSEABLE, 0,
RowIngestionMeters.THROWN_AWAY, 0
),
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.UNPARSEABLE, 3,
RowIngestionMeters.THROWN_AWAY, 2
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
new ArrayList<>(),
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
"Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
@ -1167,7 +1195,8 @@ public class IndexTaskTest
parseExceptionIgnoreSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
TaskStatus status = runTask(indexTask).lhs;
@ -1177,25 +1206,32 @@ public class IndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedMetrics = ImmutableMap.of(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0,
TaskMetricsUtils.ROWS_PROCESSED, 1,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3,
TaskMetricsUtils.ROWS_THROWN_AWAY, 2
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 1,
RowIngestionMeters.UNPARSEABLE, 3,
RowIngestionMeters.THROWN_AWAY, 2
),
RowIngestionMeters.BUILD_SEGMENTS,
ImmutableMap.of(
RowIngestionMeters.PROCESSED_WITH_ERROR, 0,
RowIngestionMeters.PROCESSED, 0,
RowIngestionMeters.UNPARSEABLE, 0,
RowIngestionMeters.THROWN_AWAY, 0
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
Arrays.asList(
"Unparseable timestamp found! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
"Unparseable timestamp found! Event: {time=9.0, dim=a, dimLong=2, dimFloat=3.0, val=1}",
"Unparseable timestamp found! Event: {time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1}"
),
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
new ArrayList<>()
);
@ -1258,7 +1294,8 @@ public class IndexTaskTest
parseExceptionIgnoreSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final List<DataSegment> segments = runTask(indexTask).rhs;
@ -1327,7 +1364,8 @@ public class IndexTaskTest
parseExceptionIgnoreSpec,
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
TaskStatus status = runTask(indexTask).lhs;
@ -1338,9 +1376,9 @@ public class IndexTaskTest
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
Map<String, Object> expectedUnparseables = ImmutableMap.of(
"determinePartitions",
RowIngestionMeters.DETERMINE_PARTITIONS,
new ArrayList<>(),
"buildSegments",
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList("Unparseable timestamp found! Event: {column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}")
);
Assert.assertEquals(expectedUnparseables, reportData.getUnparseableEvents());

View File

@ -30,6 +30,7 @@ import io.druid.guice.FirehoseModule;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
@ -69,6 +70,7 @@ import java.util.List;
public class TaskSerdeTest
{
private final ObjectMapper jsonMapper;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final IndexSpec indexSpec = new IndexSpec();
@Rule
@ -78,6 +80,7 @@ public class TaskSerdeTest
{
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
@ -214,7 +217,8 @@ public class TaskSerdeTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
final String json = jsonMapper.writeValueAsString(task);
@ -298,7 +302,8 @@ public class TaskSerdeTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
rowIngestionMetersFactory
);
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {

View File

@ -57,6 +57,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
@ -153,12 +154,14 @@ public class TaskLifecycleTest
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
private static final TestUtils TEST_UTILS;
private static final RowIngestionMetersFactory ROW_INGESTION_METERS_FACTORY;
static {
TEST_UTILS = new TestUtils();
MAPPER = TEST_UTILS.getTestObjectMapper();
INDEX_MERGER_V9 = TEST_UTILS.getTestIndexMergerV9();
INDEX_IO = TEST_UTILS.getTestIndexIO();
ROW_INGESTION_METERS_FACTORY = TEST_UTILS.getRowIngestionMetersFactory();
}
private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage";
@ -691,7 +694,8 @@ public class TaskLifecycleTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
ROW_INGESTION_METERS_FACTORY
);
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
@ -770,7 +774,8 @@ public class TaskLifecycleTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
ROW_INGESTION_METERS_FACTORY
);
final TaskStatus status = runTask(indexTask);
@ -1156,7 +1161,8 @@ public class TaskLifecycleTest
),
null,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
null
null,
ROW_INGESTION_METERS_FACTORY
);
final long startTime = System.currentTimeMillis();

View File

@ -63,6 +63,7 @@
<apache.curator.test.version>2.12.0</apache.curator.test.version>
<avatica.version>1.10.0</avatica.version>
<calcite.version>1.15.0</calcite.version>
<dropwizard.metrics.version>4.0.0</dropwizard.metrics.version>
<fastutil.version>8.1.0</fastutil.version>
<guava.version>16.0.1</guava.version>
<guice.version>4.1.0</guice.version>
@ -677,6 +678,11 @@
<artifactId>jna</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${dropwizard.metrics.version}</version>
</dependency>
<!-- Test Scope -->
<dependency>

View File

@ -19,9 +19,11 @@
package io.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.DataSourceMetadata;
import javax.annotation.Nullable;
import java.util.Map;
public interface Supervisor
{
@ -37,6 +39,11 @@ public interface Supervisor
SupervisorReport getStatus();
default Map<String, Map<String, Object>> getStats()
{
return ImmutableMap.of();
}
void reset(DataSourceMetadata dataSourceMetadata);
/**

View File

@ -1,64 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime;
import com.google.common.collect.ImmutableMap;
import io.druid.indexer.TaskMetricsGetter;
import io.druid.indexer.TaskMetricsUtils;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class FireDepartmentMetricsTaskMetricsGetter implements TaskMetricsGetter
{
public static final List<String> KEYS = Arrays.asList(
TaskMetricsUtils.ROWS_PROCESSED,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS,
TaskMetricsUtils.ROWS_THROWN_AWAY,
TaskMetricsUtils.ROWS_UNPARSEABLE
);
private final FireDepartmentMetrics fireDepartmentMetrics;
public FireDepartmentMetricsTaskMetricsGetter(
FireDepartmentMetrics fireDepartmentMetrics
)
{
this.fireDepartmentMetrics = fireDepartmentMetrics;
}
@Override
public List<String> getKeys()
{
return KEYS;
}
@Override
public Map<String, Number> getTotalMetrics()
{
return ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, fireDepartmentMetrics.processed(),
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, fireDepartmentMetrics.processedWithErrors(),
TaskMetricsUtils.ROWS_THROWN_AWAY, fireDepartmentMetrics.thrownAway(),
TaskMetricsUtils.ROWS_UNPARSEABLE, fireDepartmentMetrics.unparseable()
);
}
}

View File

@ -33,6 +33,10 @@ import java.util.List;
import java.util.Map;
/**
* RealtimeMetricsMonitor is only used by RealtimeIndexTask, this monitor only supports FireDepartmentMetrics.
* New ingestion task types should support RowIngestionMeters and use TaskRealtimeMetricsMonitor instead.
* Please see the comment on RowIngestionMeters for more information regarding the relationship between
* RowIngestionMeters and FireDepartmentMetrics.
*/
public class RealtimeMetricsMonitor extends AbstractMonitor
{

View File

@ -24,6 +24,7 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
@ -37,8 +38,11 @@ import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.PolyBind;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.overlord.ForkingTaskRunner;
import io.druid.indexing.overlord.TaskRunner;
import io.druid.indexing.worker.Worker;
@ -92,6 +96,19 @@ public class CliMiddleManager extends ServerRunnable
binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
PolyBind.createChoice(
binder,
"druid.indexer.task.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(RowIngestionMetersFactory.class)
);
rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);

View File

@ -54,6 +54,8 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import io.druid.indexing.overlord.ForkingTaskRunnerFactory;
@ -184,6 +186,19 @@ public class CliOverlord extends ServerRunnable
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
PolyBind.createChoice(
binder,
"druid.indexer.task.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(RowIngestionMetersFactory.class)
);
rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
configureTaskStorage(binder);
configureAutoscale(binder);
configureRunners(binder);

View File

@ -62,6 +62,8 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@ -159,6 +161,19 @@ public class CliPeon extends GuiceRunnable
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
PolyBind.createChoice(
binder,
"druid.indexer.task.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(RowIngestionMetersFactory.class)
);
rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.indexer.task.chathandler.type",

View File

@ -27,6 +27,8 @@ import com.google.inject.multibindings.MapBinder;
import io.druid.cli.QueryJettyServerInitializer;
import io.druid.client.cache.CacheConfig;
import io.druid.client.coordinator.CoordinatorClient;
import io.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import io.druid.indexing.common.stats.RowIngestionMetersFactory;
import io.druid.metadata.MetadataSegmentPublisher;
import io.druid.query.QuerySegmentWalker;
import io.druid.segment.realtime.FireDepartment;
@ -66,6 +68,19 @@ public class RealtimeModule implements Module
publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class).in(LazySingleton.class);
publisherBinder.addBinding("metadata").to(MetadataSegmentPublisher.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.realtime.rowIngestionMeters.type",
Key.get(RowIngestionMetersFactory.class),
Key.get(DropwizardRowIngestionMetersFactory.class)
);
final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder = PolyBind.optionBinder(
binder, Key.get(RowIngestionMetersFactory.class)
);
rowIngestionMetersHandlerProviderBinder.addBinding("dropwizard")
.to(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
PolyBind.createChoice(
binder,
"druid.realtime.chathandler.type",