Display row stats for multiphase parallel indexing tasks (#12280)

Row stats are reported for single phase tasks in the `/liveReports` and `/rowStats` APIs
and are also a part of the overall task report. This commit adds changes to report
row stats for multiphase tasks too.

Changes:
- Add `TaskReport` in `GeneratedPartitionsReport` generated during hash and range partitioning
- Collect the reports for `index_generate` phase in `ParallelIndexSupervisorTask`
This commit is contained in:
Tejaswini Bandlamudi 2022-03-02 10:10:31 +05:30 committed by GitHub
parent 50038d9344
commit 1af4c9c933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 577 additions and 230 deletions

View File

@ -21,8 +21,10 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;
import java.util.List;
import java.util.Map;
/**
* Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is
@ -35,9 +37,10 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport
@JsonCreator
GeneratedPartitionsMetadataReport(
@JsonProperty("taskId") String taskId,
@JsonProperty("partitionStats") List<PartitionStat> partitionStats
@JsonProperty("partitionStats") List<PartitionStat> partitionStats,
@JsonProperty("taskReport") Map<String, TaskReport> taskReport
)
{
super(taskId, partitionStats);
super(taskId, partitionStats, taskReport);
}
}

View File

@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskReport;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
@ -33,11 +35,13 @@ public class GeneratedPartitionsReport implements SubTaskReport
{
private final String taskId;
private final List<PartitionStat> partitionStats;
private final Map<String, TaskReport> taskReport;
GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats)
GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats, Map<String, TaskReport> taskReport)
{
this.taskId = taskId;
this.partitionStats = partitionStats;
this.taskReport = taskReport;
}
@Override
@ -47,6 +51,12 @@ public class GeneratedPartitionsReport implements SubTaskReport
return taskId;
}
@JsonProperty
public Map<String, TaskReport> getTaskReport()
{
return taskReport;
}
@JsonProperty
public List<PartitionStat> getPartitionStats()
{
@ -64,13 +74,14 @@ public class GeneratedPartitionsReport implements SubTaskReport
}
GeneratedPartitionsReport that = (GeneratedPartitionsReport) o;
return Objects.equals(taskId, that.taskId) &&
Objects.equals(partitionStats, that.partitionStats);
Objects.equals(partitionStats, that.partitionStats) &&
Objects.equals(taskReport, that.taskReport);
}
@Override
public int hashCode()
{
return Objects.hash(taskId, partitionStats);
return Objects.hash(taskId, partitionStats, taskReport);
}
@Override
@ -79,6 +90,7 @@ public class GeneratedPartitionsReport implements SubTaskReport
return "GeneratedPartitionsReport{" +
"taskId='" + taskId + '\'' +
", partitionStats=" + partitionStats +
", taskReport=" + taskReport +
'}';
}
}

View File

@ -66,6 +66,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.incremental.MutableRowIngestionMeters;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
@ -176,6 +177,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@MonotonicNonNull
private volatile TaskToolbox toolbox;
@MonotonicNonNull
private Pair<Map<String, Object>, Map<String, Object>> indexGenerateRowStats;
private IngestionState ingestionState;
@JsonCreator
@ -726,6 +730,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
);
return TaskStatus.failure(getId(), errMsg);
}
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
// 2. Partial segment merge phase
// partition (interval, partitionId) -> partition locations
@ -814,6 +819,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return TaskStatus.failure(getId(), errMsg);
}
indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true);
// partition (interval, partitionId) -> partition locations
Map<Partition, List<PartitionLocation>> partitionToLocations =
getPartitionToLocations(indexingRunner.getReports());
@ -1477,10 +1484,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
boolean includeUnparseable
)
{
long processed = 0L;
long processedWithError = 0L;
long thrownAway = 0L;
long unparseable = 0L;
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
@ -1492,28 +1496,67 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId());
continue;
}
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
RowIngestionMetersTotals rowIngestionMetersTotals =
getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents);
if (includeUnparseable) {
List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(taskUnparsebleEvents);
}
processed += totals.getProcessed();
processedWithError += totals.getProcessedWithError();
thrownAway += totals.getThrownAway();
unparseable += totals.getUnparseable();
buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals);
}
// Get stats from running tasks
Set<String> runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds();
RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
parallelSinglePhaseRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEventsParallelMultiPhase(
ParallelIndexTaskRunner<?, ?> currentRunner,
boolean includeUnparseable
)
{
if (indexGenerateRowStats != null) {
return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of());
} else if (!currentRunner.getName().equals("partial segment generation")) {
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
} else {
Map<String, GeneratedPartitionsReport> completedSubtaskReports =
(Map<String, GeneratedPartitionsReport>) currentRunner.getReports();
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
final List<ParseExceptionReport> unparseableEvents = new ArrayList<>();
for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) {
Map<String, TaskReport> taskReport = generatedPartitionsReport.getTaskReport();
if (taskReport == null || taskReport.isEmpty()) {
LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId());
continue;
}
RowIngestionMetersTotals rowStatsForCompletedTask =
getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask);
}
RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks(
currentRunner.getRunningTaskIds(),
unparseableEvents,
includeUnparseable
);
buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks);
return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents);
}
}
private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks(
Set<String> runningTaskIds,
List<ParseExceptionReport> unparseableEvents,
boolean includeUnparseable
)
{
final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters();
for (String runningTaskId : runningTaskIds) {
try {
Map<String, Object> report = toolbox.getIndexingServiceClient().getTaskReport(runningTaskId);
@ -1521,6 +1564,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
// task does not have a running report yet
continue;
}
Map<String, Object> ingestionStatsAndErrors = (Map<String, Object>) report.get("ingestionStatsAndErrors");
Map<String, Object> payload = (Map<String, Object>) ingestionStatsAndErrors.get("payload");
Map<String, Object> rowStats = (Map<String, Object>) payload.get("rowStats");
@ -1529,33 +1573,53 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (includeUnparseable) {
Map<String, Object> taskUnparseableEvents = (Map<String, Object>) payload.get("unparseableEvents");
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>) taskUnparseableEvents.get(
RowIngestionMeters.BUILD_SEGMENTS
);
List<ParseExceptionReport> buildSegmentsUnparseableEvents = (List<ParseExceptionReport>)
taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(buildSegmentsUnparseableEvents);
}
processed += ((Number) buildSegments.get("processed")).longValue();
processedWithError += ((Number) buildSegments.get("processedWithError")).longValue();
thrownAway += ((Number) buildSegments.get("thrownAway")).longValue();
unparseable += ((Number) buildSegments.get("unparseable")).longValue();
buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments));
}
catch (Exception e) {
LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId);
}
}
return buildSegmentsRowStats.getTotals();
}
private Pair<Map<String, Object>, Map<String, Object>> createStatsAndErrorsReport(
RowIngestionMetersTotals rowStats,
List<ParseExceptionReport> unparseableEvents
)
{
Map<String, Object> rowStatsMap = new HashMap<>();
Map<String, Object> totalsMap = new HashMap<>();
totalsMap.put(
RowIngestionMeters.BUILD_SEGMENTS,
new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable)
);
totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats);
rowStatsMap.put("totals", totalsMap);
return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents));
}
private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport(
Map<String, TaskReport> taskReport,
boolean includeUnparseable,
List<ParseExceptionReport> unparseableEvents)
{
IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get(
IngestionStatsAndErrorsTaskReport.REPORT_KEY);
IngestionStatsAndErrorsTaskReportData reportData =
(IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload();
RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats(
reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS)
);
if (includeUnparseable) {
List<ParseExceptionReport> taskUnparsebleEvents = (List<ParseExceptionReport>) reportData.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
unparseableEvents.addAll(taskUnparsebleEvents);
}
return totals;
}
private Pair<Map<String, Object>, Map<String, Object>> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable)
{
if (currentSubTaskHolder == null) {
@ -1569,8 +1633,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
if (isParallelMode()) {
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
// multiphase is not supported yet
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
return doGetRowStatsAndUnparseableEventsParallelMultiPhase(
(ParallelIndexTaskRunner<?, ?>) currentRunner,
includeUnparseable
);
} else {
return doGetRowStatsAndUnparseableEventsParallelSinglePhase(
(SinglePhaseParallelIndexTaskRunner) currentRunner,

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -161,12 +162,12 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
}
@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
}
/**

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -175,11 +176,11 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
}
@Override
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments, Map<String, TaskReport> taskReport)
{
List<PartitionStat> partitionStats = segments.stream()
.map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
.collect(Collectors.toList());
return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport);
}
}

View File

@ -20,12 +20,18 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.InputSourceProcessor;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SequenceNameFunction;
@ -35,6 +41,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskIn
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
@ -46,10 +53,12 @@ import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.timeline.DataSegment;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -64,6 +73,12 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
private final String supervisorTaskId;
private final IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder;
@MonotonicNonNull
private RowIngestionMeters buildSegmentsMeters;
@MonotonicNonNull
private ParseExceptionHandler parseExceptionHandler;
PartialSegmentGenerateTask(
String id,
String groupId,
@ -113,7 +128,10 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
inputSource,
toolbox.getIndexingTmpDir()
);
taskClient.report(supervisorTaskId, createGeneratedPartitionsReport(toolbox, segments));
Map<String, TaskReport> taskReport = getTaskCompletionReports();
taskClient.report(supervisorTaskId, createGeneratedPartitionsReport(toolbox, segments, taskReport));
return TaskStatus.success(getId());
}
@ -131,7 +149,8 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
*/
abstract T createGeneratedPartitionsReport(
TaskToolbox toolbox,
List<DataSegment> segments
List<DataSegment> segments,
Map<String, TaskReport> taskReport
);
private List<DataSegment> generateSegments(
@ -148,7 +167,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
null
);
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
toolbox.addMonitor(
new RealtimeMetricsMonitor(
@ -164,7 +183,7 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
final SegmentAllocatorForBatch segmentAllocator = createSegmentAllocator(toolbox, taskClient);
final SequenceNameFunction sequenceNameFunction = segmentAllocator.getSequenceNameFunction();
final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
parseExceptionHandler = new ParseExceptionHandler(
buildSegmentsMeters,
tuningConfig.isLogParseExceptions(),
tuningConfig.getMaxParseExceptions(),
@ -203,7 +222,6 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
parseExceptionHandler,
pushTimeout
);
return pushed.getSegments();
}
catch (Exception e) {
@ -218,4 +236,50 @@ abstract class PartialSegmentGenerateTask<T extends GeneratedPartitionsReport> e
}
}
}
/**
* Generate an IngestionStatsAndErrorsTaskReport for the task.
*/
private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
new IngestionStatsAndErrorsTaskReport(
getId(),
new IngestionStatsAndErrorsTaskReportData(
IngestionState.COMPLETED,
getTaskCompletionUnparseableEvents(),
getTaskCompletionRowStats(),
"",
false, // not applicable for parallel subtask
segmentAvailabilityWaitTimeMs
)
)
);
}
private Map<String, Object> getTaskCompletionUnparseableEvents()
{
Map<String, Object> unparseableEventsMap = new HashMap<>();
List<ParseExceptionReport> parseExceptionMessages = IndexTaskUtils.getReportListFromSavedParseExceptions(
parseExceptionHandler.getSavedParseExceptionReports()
);
if (parseExceptionMessages != null) {
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, parseExceptionMessages);
} else {
unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, ImmutableList.of());
}
return unparseableEventsMap;
}
private Map<String, Object> getTaskCompletionRowStats()
{
Map<String, Object> metrics = new HashMap<>();
metrics.put(
RowIngestionMeters.BUILD_SEGMENTS,
buildSegmentsMeters.getTotals()
);
return metrics;
}
}

View File

@ -149,7 +149,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
boolean appendToExisting
)
{
final ParallelIndexSupervisorTask task = newTask(
final ParallelIndexSupervisorTask task = createTask(
timestampSpec,
dimensionsSpec,
inputFormat,
@ -165,15 +165,26 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
return runTask(task, expectedTaskStatus);
}
Set<DataSegment> runTask(Task task, TaskState expectedTaskStatus)
void runTaskAndVerifyStatus(Task task, TaskState expectedTaskStatus)
{
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task);
Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode());
}
Set<DataSegment> runTask(Task task, TaskState expectedTaskStatus)
{
runTaskAndVerifyStatus(task, expectedTaskStatus);
return getIndexingServiceClient().getPublishedSegments(task);
}
protected ParallelIndexSupervisorTask newTask(
Map<String, Object> runTaskAndGetReports(Task task, TaskState expectedTaskStatus)
{
runTaskAndVerifyStatus(task, expectedTaskStatus);
return getIndexingServiceClient().getTaskReport(task.getId());
}
protected ParallelIndexSupervisorTask createTask(
@Nullable TimestampSpec timestampSpec,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable InputFormat inputFormat,

View File

@ -42,6 +42,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@ -78,7 +79,9 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
@ -101,6 +104,7 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@ -111,6 +115,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -125,6 +130,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
{
@ -297,7 +303,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
null,
5,
null,
null,
null
@ -543,6 +549,16 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
return taskRunner.run(injectIfNeeded(task));
}
@Override
public Map<String, Object> getTaskReport(String taskId)
{
final Optional<Task> task = getTaskStorage().getTask(taskId);
if (!task.isPresent()) {
return null;
}
return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports("full");
}
public TaskContainer getTaskContainer(String taskId)
{
return taskRunner.getTaskContainer(taskId);
@ -791,6 +807,111 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
}
}
protected Map<String, Object> buildExpectedTaskReportSequential(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedDeterminePartitions,
RowIngestionMetersTotals expectedTotals
)
{
final Map<String, Object> payload = new HashMap<>();
payload.put("ingestionState", IngestionState.COMPLETED);
payload.put(
"unparseableEvents",
ImmutableMap.of("determinePartitions", ImmutableList.of(), "buildSegments", expectedUnparseableEvents)
);
Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0.0,
"unparseable", 0.0,
"thrownAway", 0.0,
"processedWithError", 0.0
);
Map<String, Object> emptyAverages = ImmutableMap.of(
"1m", emptyAverageMinuteMap,
"5m", emptyAverageMinuteMap,
"15m", emptyAverageMinuteMap
);
payload.put(
"rowStats",
ImmutableMap.of(
"movingAverages",
ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages),
"totals",
ImmutableMap.of("determinePartitions", expectedDeterminePartitions, "buildSegments", expectedTotals)
)
);
final Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
ingestionStatsAndErrors.put("taskId", taskId);
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
return Collections.singletonMap("ingestionStatsAndErrors", ingestionStatsAndErrors);
}
protected Map<String, Object> buildExpectedTaskReportParallel(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedTotals
)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
payload.put("ingestionState", IngestionState.COMPLETED);
payload.put("unparseableEvents", ImmutableMap.of("buildSegments", expectedUnparseableEvents));
payload.put("rowStats", ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals)));
ingestionStatsAndErrors.put("taskId", taskId);
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return returnMap;
}
protected void compareTaskReports(
Map<String, Object> expectedReports,
Map<String, Object> actualReports
)
{
expectedReports = (Map<String, Object>) expectedReports.get("ingestionStatsAndErrors");
actualReports = (Map<String, Object>) actualReports.get("ingestionStatsAndErrors");
Assert.assertEquals(expectedReports.get("taskId"), actualReports.get("taskId"));
Assert.assertEquals(expectedReports.get("type"), actualReports.get("type"));
Map<String, Object> expectedPayload = (Map<String, Object>) expectedReports.get("payload");
Map<String, Object> actualPayload = (Map<String, Object>) actualReports.get("payload");
Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState"));
Assert.assertEquals(expectedPayload.get("rowStats"), actualPayload.get("rowStats"));
Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState"));
List<ParseExceptionReport> expectedParseExceptionReports =
(List<ParseExceptionReport>) ((Map<String, Object>)
expectedPayload.get("unparseableEvents")).get("buildSegments");
List<ParseExceptionReport> actualParseExceptionReports =
(List<ParseExceptionReport>) ((Map<String, Object>)
actualPayload.get("unparseableEvents")).get("buildSegments");
List<String> expectedMessages = expectedParseExceptionReports
.stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList());
List<String> actualMessages = actualParseExceptionReports
.stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
List<String> expectedInputs = expectedParseExceptionReports
.stream().map(ParseExceptionReport::getInput).collect(Collectors.toList());
List<String> actualInputs = actualParseExceptionReports
.stream().map(ParseExceptionReport::getInput).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
}
static class LocalParallelIndexTaskClientFactory implements IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>
{
private final ConcurrentMap<String, TaskContainer> tasks;

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
@ -161,15 +162,14 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
public void testRun() throws Exception
{
final Integer maxRowsPerSegment = numShards == null ? 10 : null;
final Set<DataSegment> publishedSegments = runTestTask(
final Set<DataSegment> publishedSegments = runTask(createTask(
new HashedPartitionsSpec(
maxRowsPerSegment,
numShards,
ImmutableList.of("dim1", "dim2")
),
TaskState.SUCCESS,
false
);
), TaskState.SUCCESS);
final Map<Interval, Integer> expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments(
maxRowsPerSegment,
@ -182,16 +182,14 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
public void testRunWithHashPartitionFunction() throws Exception
{
final Integer maxRowsPerSegment = numShards == null ? 10 : null;
final Set<DataSegment> publishedSegments = runTestTask(
final Set<DataSegment> publishedSegments = runTask(createTask(
new HashedPartitionsSpec(
maxRowsPerSegment,
numShards,
ImmutableList.of("dim1", "dim2"),
HashPartitionFunction.MURMUR3_32_ABS
),
TaskState.SUCCESS,
false
);
false), TaskState.SUCCESS);
final Map<Interval, Integer> expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments(
maxRowsPerSegment,
numShards
@ -199,6 +197,39 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
assertHashedPartition(publishedSegments, expectedIntervalToNumSegments);
}
@Test
public void testRowStats()
{
final Integer maxRowsPerSegment = numShards == null ? 10 : null;
ParallelIndexSupervisorTask task = createTask(
new HashedPartitionsSpec(
maxRowsPerSegment,
numShards,
ImmutableList.of("dim1", "dim2"),
HashPartitionFunction.MURMUR3_32_ABS),
false);
RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(200, 0, 0, 0);
Map<String, Object> expectedReports;
if (maxNumConcurrentSubTasks <= 1) {
expectedReports = buildExpectedTaskReportSequential(
task.getId(),
ImmutableList.of(),
numShards == null ? expectedTotals : new RowIngestionMetersTotals(0, 0, 0, 0),
expectedTotals
);
} else {
// when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses the single phase runner
// instead of sequential runner
expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(),
expectedTotals
);
}
Map<String, Object> actualReports = runTaskAndGetReports(task, TaskState.SUCCESS);
compareTaskReports(expectedReports, actualReports);
}
private Map<Interval, Integer> computeExpectedIntervalToNumSegments(
@Nullable Integer maxRowsPerSegment,
@Nullable Integer numShards
@ -224,27 +255,26 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
{
final Set<DataSegment> publishedSegments = new HashSet<>();
publishedSegments.addAll(
runTestTask(
new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1", "dim2")),
TaskState.SUCCESS,
false
)
runTask(
createTask(
new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1", "dim2")),
false),
TaskState.SUCCESS)
);
// Append
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(5, null),
TaskState.SUCCESS,
true
)
);
runTask(
createTask(
new DynamicPartitionsSpec(5, null),
true),
TaskState.SUCCESS));
// And append again
publishedSegments.addAll(
runTestTask(
new DynamicPartitionsSpec(10, null),
TaskState.SUCCESS,
true
)
runTask(
createTask(
new DynamicPartitionsSpec(10, null),
true),
TaskState.SUCCESS)
);
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
@ -275,14 +305,13 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
}
}
private Set<DataSegment> runTestTask(
private ParallelIndexSupervisorTask createTask(
PartitionsSpec partitionsSpec,
TaskState expectedTaskState,
boolean appendToExisting
)
{
if (isUseInputFormatApi()) {
return runTestTask(
return createTask(
TIMESTAMP_SPEC,
DIMENSIONS_SPEC,
INPUT_FORMAT,
@ -292,11 +321,10 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
"test_*",
partitionsSpec,
maxNumConcurrentSubTasks,
expectedTaskState,
appendToExisting
);
} else {
return runTestTask(
return createTask(
null,
null,
null,
@ -306,7 +334,6 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
"test_*",
partitionsSpec,
maxNumConcurrentSubTasks,
expectedTaskState,
appendToExisting
);
}

View File

@ -355,13 +355,13 @@ public class ParallelIndexSupervisorTaskTest
createRangePartitionStat(day2, 7),
createRangePartitionStat(day1, 0),
createRangePartitionStat(day2, 1)
)));
), null));
taskIdToReport.put(task2, new GeneratedPartitionsReport(task2, Arrays.asList(
createRangePartitionStat(day1, 4),
createRangePartitionStat(day1, 6),
createRangePartitionStat(day2, 1),
createRangePartitionStat(day1, 1)
)));
), null));
Map<ParallelIndexSupervisorTask.Partition, List<PartitionLocation>> partitionToLocations
= ParallelIndexSupervisorTask.getPartitionToLocations(taskIdToReport);

View File

@ -41,6 +41,7 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -225,22 +226,39 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
public void createsCorrectRangePartitions() throws Exception
{
int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments = runTestTask(
final Set<DataSegment> publishedSegments = runTask(runTestTask(
new DimensionRangePartitionsSpec(
targetRowsPerSegment,
null,
Collections.singletonList(DIM1),
false
),
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS,
false
);
), useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS);
if (!useMultivalueDim) {
assertRangePartitions(publishedSegments);
}
}
@Test
public void testRowStats()
{
if (useMultivalueDim) {
return;
}
final int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
ParallelIndexSupervisorTask task = runTestTask(
new SingleDimensionPartitionsSpec(targetRowsPerSegment, null, DIM1, false),
false);
Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(),
new RowIngestionMetersTotals(600, 0, 0, 0));
Map<String, Object> actualReports = runTaskAndGetReports(task, TaskState.SUCCESS);
compareTaskReports(expectedReports, actualReports);
}
@Test
public void testAppendLinearlyPartitionedSegmentsToHashPartitionedDatasourceSuccessfullyAppend()
{
@ -250,32 +268,29 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
final int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments = new HashSet<>();
publishedSegments.addAll(
runTestTask(
runTask(runTestTask(
new SingleDimensionPartitionsSpec(
targetRowsPerSegment,
null,
DIM1,
false
),
TaskState.SUCCESS,
false
)
), TaskState.SUCCESS)
);
// Append
publishedSegments.addAll(
runTestTask(
runTask(runTestTask(
new DynamicPartitionsSpec(5, null),
TaskState.SUCCESS,
true
)
), TaskState.SUCCESS)
);
// And append again
publishedSegments.addAll(
runTestTask(
runTask(runTestTask(
new DynamicPartitionsSpec(10, null),
TaskState.SUCCESS,
true
)
), TaskState.SUCCESS)
);
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
@ -306,14 +321,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
}
}
private Set<DataSegment> runTestTask(
private ParallelIndexSupervisorTask runTestTask(
PartitionsSpec partitionsSpec,
TaskState expectedTaskState,
boolean appendToExisting
)
{
if (isUseInputFormatApi()) {
return runTestTask(
return createTask(
TIMESTAMP_SPEC,
DIMENSIONS_SPEC,
INPUT_FORMAT,
@ -323,11 +337,10 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
TEST_FILE_NAME_PREFIX + "*",
partitionsSpec,
maxNumConcurrentSubTasks,
expectedTaskState,
appendToExisting
);
} else {
return runTestTask(
return createTask(
null,
null,
null,
@ -337,7 +350,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
TEST_FILE_NAME_PREFIX + "*",
partitionsSpec,
maxNumConcurrentSubTasks,
expectedTaskState,
appendToExisting
);
}

View File

@ -27,7 +27,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
@ -71,14 +70,12 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest
@ -335,7 +332,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Collections.emptyList()
);
Map<String, Object> actualReports = task.doGetLiveReports("full");
Map<String, Object> expectedReports = getExpectedTaskReportParallel(
Map<String, Object> expectedReports = buildExpectedTaskReportParallel(
task.getId(),
ImmutableList.of(
new ParseExceptionReport(
@ -360,68 +357,6 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
compareTaskReports(expectedReports, actualReports);
}
private void compareTaskReports(
Map<String, Object> expectedReports,
Map<String, Object> actualReports
)
{
expectedReports = (Map<String, Object>) expectedReports.get("ingestionStatsAndErrors");
actualReports = (Map<String, Object>) actualReports.get("ingestionStatsAndErrors");
Assert.assertEquals(expectedReports.get("taskId"), actualReports.get("taskId"));
Assert.assertEquals(expectedReports.get("type"), actualReports.get("type"));
Map<String, Object> expectedPayload = (Map<String, Object>) expectedReports.get("payload");
Map<String, Object> actualPayload = (Map<String, Object>) actualReports.get("payload");
Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState"));
Assert.assertEquals(expectedPayload.get("rowStats"), actualPayload.get("rowStats"));
Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState"));
List<ParseExceptionReport> expectedParseExceptionReports = (List<ParseExceptionReport>) ((Map<String, Object>) expectedPayload.get("unparseableEvents"))
.get("buildSegments");
List<ParseExceptionReport> actualParseExceptionReports = (List<ParseExceptionReport>) ((Map<String, Object>) actualPayload.get("unparseableEvents"))
.get("buildSegments");
List<String> expectedMessages = expectedParseExceptionReports.stream().map((r) -> {
return r.getDetails().get(0);
}).collect(Collectors.toList());
List<String> actualMessages = actualParseExceptionReports.stream().map((r) -> {
return r.getDetails().get(0);
}).collect(Collectors.toList());
Assert.assertEquals(expectedMessages, actualMessages);
List<String> expectedInputs = expectedParseExceptionReports.stream().map((r) -> {
return r.getInput();
}).collect(Collectors.toList());
List<String> actualInputs = actualParseExceptionReports.stream().map((r) -> {
return r.getInput();
}).collect(Collectors.toList());
Assert.assertEquals(expectedInputs, actualInputs);
}
private Map<String, Object> getExpectedTaskReportParallel(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedTotals
)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
payload.put("ingestionState", IngestionState.COMPLETED);
payload.put("unparseableEvents", ImmutableMap.of("buildSegments", expectedUnparseableEvents));
payload.put("rowStats", ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals)));
ingestionStatsAndErrors.put("taskId", taskId);
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return returnMap;
}
//
// Ingest all data.
@ -475,15 +410,16 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
Map<String, Object> expectedReports;
if (useInputFormatApi) {
expectedReports = getExpectedTaskReportSequential(
expectedReports = buildExpectedTaskReportSequential(
task.getId(),
expectedUnparseableEvents,
new RowIngestionMetersTotals(0, 0, 0, 0),
expectedTotals
);
} else {
// when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses the single phase runner
// instead of sequential runner
expectedReports = getExpectedTaskReportParallel(
expectedReports = buildExpectedTaskReportParallel(
task.getId(),
expectedUnparseableEvents,
expectedTotals
@ -494,65 +430,6 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
System.out.println(actualReports);
}
private Map<String, Object> getExpectedTaskReportSequential(
String taskId,
List<ParseExceptionReport> expectedUnparseableEvents,
RowIngestionMetersTotals expectedTotals
)
{
Map<String, Object> returnMap = new HashMap<>();
Map<String, Object> ingestionStatsAndErrors = new HashMap<>();
Map<String, Object> payload = new HashMap<>();
payload.put("ingestionState", IngestionState.COMPLETED);
payload.put(
"unparseableEvents",
ImmutableMap.of(
"determinePartitions", ImmutableList.of(),
"buildSegments", expectedUnparseableEvents
)
);
Map<String, Object> emptyAverageMinuteMap = ImmutableMap.of(
"processed", 0.0,
"unparseable", 0.0,
"thrownAway", 0.0,
"processedWithError", 0.0
);
Map<String, Object> emptyAverages = ImmutableMap.of(
"1m", emptyAverageMinuteMap,
"5m", emptyAverageMinuteMap,
"15m", emptyAverageMinuteMap
);
payload.put(
"rowStats",
ImmutableMap.of(
"movingAverages",
ImmutableMap.of(
"determinePartitions",
emptyAverages,
"buildSegments",
emptyAverages
),
"totals",
ImmutableMap.of(
"determinePartitions",
new RowIngestionMetersTotals(0, 0, 0, 0),
"buildSegments",
expectedTotals
)
)
);
ingestionStatsAndErrors.put("taskId", taskId);
ingestionStatsAndErrors.put("payload", payload);
ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors");
returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors);
return returnMap;
}
@Test
public void testPublishEmptySegments()
{

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.incremental;
import java.util.Map;
public class MutableRowIngestionMeters implements RowIngestionMeters
{
private long processed;
private long processedWithError;
private long unparseable;
private long thrownAway;
public MutableRowIngestionMeters()
{
this.processed = 0;
this.processedWithError = 0;
this.unparseable = 0;
this.thrownAway = 0;
}
@Override
public long getProcessed()
{
return processed;
}
@Override
public void incrementProcessed()
{
processed++;
}
@Override
public long getProcessedWithError()
{
return processedWithError;
}
@Override
public void incrementProcessedWithError()
{
processedWithError++;
}
@Override
public long getUnparseable()
{
return unparseable;
}
@Override
public void incrementUnparseable()
{
unparseable++;
}
@Override
public long getThrownAway()
{
return thrownAway;
}
@Override
public void incrementThrownAway()
{
thrownAway++;
}
@Override
public RowIngestionMetersTotals getTotals()
{
return new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable);
}
@Override
public Map<String, Object> getMovingAverages()
{
throw new UnsupportedOperationException();
}
public void addRowIngestionMetersTotals(RowIngestionMetersTotals rowIngestionMetersTotals)
{
this.processed += rowIngestionMetersTotals.getProcessed();
this.processedWithError += rowIngestionMetersTotals.getProcessedWithError();
this.unparseable += rowIngestionMetersTotals.getUnparseable();
this.thrownAway += rowIngestionMetersTotals.getThrownAway();
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.incremental;
import org.junit.Assert;
import org.junit.Test;
public class MutableRowIngestionMetersTest
{
@Test
public void testIncrement()
{
MutableRowIngestionMeters mutableRowIngestionMeters = new MutableRowIngestionMeters();
mutableRowIngestionMeters.incrementProcessed();
mutableRowIngestionMeters.incrementProcessedWithError();
mutableRowIngestionMeters.incrementUnparseable();
mutableRowIngestionMeters.incrementThrownAway();
Assert.assertEquals(mutableRowIngestionMeters.getTotals(), new RowIngestionMetersTotals(1, 1, 1, 1));
}
@Test
public void testAddRowIngestionMetersTotals()
{
MutableRowIngestionMeters mutableRowIngestionMeters = new MutableRowIngestionMeters();
RowIngestionMetersTotals rowIngestionMetersTotals = new RowIngestionMetersTotals(10, 1, 0, 1);
mutableRowIngestionMeters.addRowIngestionMetersTotals(rowIngestionMetersTotals);
Assert.assertEquals(mutableRowIngestionMeters.getTotals(), rowIngestionMetersTotals);
}
}