diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java index 553cc1d6e8e..a83c3cc7412 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java @@ -21,8 +21,10 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.TaskReport; import java.util.List; +import java.util.Map; /** * Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is @@ -35,9 +37,10 @@ class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport @JsonCreator GeneratedPartitionsMetadataReport( @JsonProperty("taskId") String taskId, - @JsonProperty("partitionStats") List partitionStats + @JsonProperty("partitionStats") List partitionStats, + @JsonProperty("taskReport") Map taskReport ) { - super(taskId, partitionStats); + super(taskId, partitionStats, taskReport); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java index d05d4ddd943..1fa025d1c91 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java @@ -20,8 +20,10 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.common.TaskReport; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -33,11 +35,13 @@ public class GeneratedPartitionsReport implements SubTaskReport { private final String taskId; private final List partitionStats; + private final Map taskReport; - GeneratedPartitionsReport(String taskId, List partitionStats) + GeneratedPartitionsReport(String taskId, List partitionStats, Map taskReport) { this.taskId = taskId; this.partitionStats = partitionStats; + this.taskReport = taskReport; } @Override @@ -47,6 +51,12 @@ public class GeneratedPartitionsReport implements SubTaskReport return taskId; } + @JsonProperty + public Map getTaskReport() + { + return taskReport; + } + @JsonProperty public List getPartitionStats() { @@ -64,13 +74,14 @@ public class GeneratedPartitionsReport implements SubTaskReport } GeneratedPartitionsReport that = (GeneratedPartitionsReport) o; return Objects.equals(taskId, that.taskId) && - Objects.equals(partitionStats, that.partitionStats); + Objects.equals(partitionStats, that.partitionStats) && + Objects.equals(taskReport, that.taskReport); } @Override public int hashCode() { - return Objects.hash(taskId, partitionStats); + return Objects.hash(taskId, partitionStats, taskReport); } @Override @@ -79,6 +90,7 @@ public class GeneratedPartitionsReport implements SubTaskReport return "GeneratedPartitionsReport{" + "taskId='" + taskId + '\'' + ", partitionStats=" + partitionStats + + ", taskReport=" + taskReport + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index f46d8ea1c33..fdab29aba32 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -66,6 +66,7 @@ import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.incremental.MutableRowIngestionMeters; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; @@ -176,6 +177,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @MonotonicNonNull private volatile TaskToolbox toolbox; + @MonotonicNonNull + private Pair, Map> indexGenerateRowStats; + private IngestionState ingestionState; @JsonCreator @@ -726,6 +730,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen ); return TaskStatus.failure(getId(), errMsg); } + indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); // 2. Partial segment merge phase // partition (interval, partitionId) -> partition locations @@ -814,6 +819,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen return TaskStatus.failure(getId(), errMsg); } + indexGenerateRowStats = doGetRowStatsAndUnparseableEventsParallelMultiPhase(indexingRunner, true); + // partition (interval, partitionId) -> partition locations Map> partitionToLocations = getPartitionToLocations(indexingRunner.getReports()); @@ -1477,10 +1484,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen boolean includeUnparseable ) { - long processed = 0L; - long processedWithError = 0L; - long thrownAway = 0L; - long unparseable = 0L; + final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters(); List unparseableEvents = new ArrayList<>(); @@ -1492,28 +1496,67 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen LOG.warn("Got an empty task report from subtask: " + pushedSegmentsReport.getTaskId()); continue; } - IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get( - IngestionStatsAndErrorsTaskReport.REPORT_KEY); - IngestionStatsAndErrorsTaskReportData reportData = - (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload(); - RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats( - reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS) - ); + RowIngestionMetersTotals rowIngestionMetersTotals = + getBuildSegmentsStatsFromTaskReport(taskReport, includeUnparseable, unparseableEvents); - if (includeUnparseable) { - List taskUnparsebleEvents = (List) reportData.getUnparseableEvents() - .get(RowIngestionMeters.BUILD_SEGMENTS); - unparseableEvents.addAll(taskUnparsebleEvents); - } - - processed += totals.getProcessed(); - processedWithError += totals.getProcessedWithError(); - thrownAway += totals.getThrownAway(); - unparseable += totals.getUnparseable(); + buildSegmentsRowStats.addRowIngestionMetersTotals(rowIngestionMetersTotals); } - // Get stats from running tasks - Set runningTaskIds = parallelSinglePhaseRunner.getRunningTaskIds(); + RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks( + parallelSinglePhaseRunner.getRunningTaskIds(), + unparseableEvents, + includeUnparseable + ); + buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks); + + return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents); + } + + private Pair, Map> doGetRowStatsAndUnparseableEventsParallelMultiPhase( + ParallelIndexTaskRunner currentRunner, + boolean includeUnparseable + ) + { + if (indexGenerateRowStats != null) { + return Pair.of(indexGenerateRowStats.lhs, includeUnparseable ? indexGenerateRowStats.rhs : ImmutableMap.of()); + } else if (!currentRunner.getName().equals("partial segment generation")) { + return Pair.of(ImmutableMap.of(), ImmutableMap.of()); + } else { + Map completedSubtaskReports = + (Map) currentRunner.getReports(); + + final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters(); + final List unparseableEvents = new ArrayList<>(); + for (GeneratedPartitionsReport generatedPartitionsReport : completedSubtaskReports.values()) { + Map taskReport = generatedPartitionsReport.getTaskReport(); + if (taskReport == null || taskReport.isEmpty()) { + LOG.warn("Got an empty task report from subtask: " + generatedPartitionsReport.getTaskId()); + continue; + } + RowIngestionMetersTotals rowStatsForCompletedTask = + getBuildSegmentsStatsFromTaskReport(taskReport, true, unparseableEvents); + + buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForCompletedTask); + } + + RowIngestionMetersTotals rowStatsForRunningTasks = getRowStatsAndUnparseableEventsForRunningTasks( + currentRunner.getRunningTaskIds(), + unparseableEvents, + includeUnparseable + ); + buildSegmentsRowStats.addRowIngestionMetersTotals(rowStatsForRunningTasks); + + return createStatsAndErrorsReport(buildSegmentsRowStats.getTotals(), unparseableEvents); + } + } + + private RowIngestionMetersTotals getRowStatsAndUnparseableEventsForRunningTasks( + Set runningTaskIds, + List unparseableEvents, + boolean includeUnparseable + ) + { + final MutableRowIngestionMeters buildSegmentsRowStats = new MutableRowIngestionMeters(); for (String runningTaskId : runningTaskIds) { try { Map report = toolbox.getIndexingServiceClient().getTaskReport(runningTaskId); @@ -1521,6 +1564,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen // task does not have a running report yet continue; } + Map ingestionStatsAndErrors = (Map) report.get("ingestionStatsAndErrors"); Map payload = (Map) ingestionStatsAndErrors.get("payload"); Map rowStats = (Map) payload.get("rowStats"); @@ -1529,33 +1573,53 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen if (includeUnparseable) { Map taskUnparseableEvents = (Map) payload.get("unparseableEvents"); - List buildSegmentsUnparseableEvents = (List) taskUnparseableEvents.get( - RowIngestionMeters.BUILD_SEGMENTS - ); + List buildSegmentsUnparseableEvents = (List) + taskUnparseableEvents.get(RowIngestionMeters.BUILD_SEGMENTS); unparseableEvents.addAll(buildSegmentsUnparseableEvents); } - processed += ((Number) buildSegments.get("processed")).longValue(); - processedWithError += ((Number) buildSegments.get("processedWithError")).longValue(); - thrownAway += ((Number) buildSegments.get("thrownAway")).longValue(); - unparseable += ((Number) buildSegments.get("unparseable")).longValue(); + buildSegmentsRowStats.addRowIngestionMetersTotals(getTotalsFromBuildSegmentsRowStats(buildSegments)); } catch (Exception e) { LOG.warn(e, "Encountered exception when getting live subtask report for task: " + runningTaskId); } } + return buildSegmentsRowStats.getTotals(); + } + private Pair, Map> createStatsAndErrorsReport( + RowIngestionMetersTotals rowStats, + List unparseableEvents + ) + { Map rowStatsMap = new HashMap<>(); Map totalsMap = new HashMap<>(); - totalsMap.put( - RowIngestionMeters.BUILD_SEGMENTS, - new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable) - ); + totalsMap.put(RowIngestionMeters.BUILD_SEGMENTS, rowStats); rowStatsMap.put("totals", totalsMap); return Pair.of(rowStatsMap, ImmutableMap.of(RowIngestionMeters.BUILD_SEGMENTS, unparseableEvents)); } + private RowIngestionMetersTotals getBuildSegmentsStatsFromTaskReport( + Map taskReport, + boolean includeUnparseable, + List unparseableEvents) + { + IngestionStatsAndErrorsTaskReport ingestionStatsAndErrorsReport = (IngestionStatsAndErrorsTaskReport) taskReport.get( + IngestionStatsAndErrorsTaskReport.REPORT_KEY); + IngestionStatsAndErrorsTaskReportData reportData = + (IngestionStatsAndErrorsTaskReportData) ingestionStatsAndErrorsReport.getPayload(); + RowIngestionMetersTotals totals = getTotalsFromBuildSegmentsRowStats( + reportData.getRowStats().get(RowIngestionMeters.BUILD_SEGMENTS) + ); + if (includeUnparseable) { + List taskUnparsebleEvents = (List) reportData.getUnparseableEvents() + .get(RowIngestionMeters.BUILD_SEGMENTS); + unparseableEvents.addAll(taskUnparsebleEvents); + } + return totals; + } + private Pair, Map> doGetRowStatsAndUnparseableEvents(String full, boolean includeUnparseable) { if (currentSubTaskHolder == null) { @@ -1569,8 +1633,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen if (isParallelMode()) { if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) { - // multiphase is not supported yet - return Pair.of(ImmutableMap.of(), ImmutableMap.of()); + return doGetRowStatsAndUnparseableEventsParallelMultiPhase( + (ParallelIndexTaskRunner) currentRunner, + includeUnparseable + ); } else { return doGetRowStatsAndUnparseableEventsParallelSinglePhase( (SinglePhaseParallelIndexTaskRunner) currentRunner, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index ea320a4703d..1eb141c3f58 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -161,12 +162,12 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask segments) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, Map taskReport) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); - return new GeneratedPartitionsMetadataReport(getId(), partitionStats); + return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index f36f4e8de46..ab966e67d5b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -175,11 +176,11 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask< } @Override - GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments) + GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List segments, Map taskReport) { List partitionStats = segments.stream() .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment)) .collect(Collectors.toList()); - return new GeneratedPartitionsMetadataReport(getId(), partitionStats); + return new GeneratedPartitionsMetadataReport(getId(), partitionStats, taskReport); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 2f7f67c7b2b..b8704457226 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -20,12 +20,18 @@ package org.apache.druid.indexing.common.task.batch.parallel; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputSource; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; +import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; +import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.BatchAppenderators; import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider; +import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.InputSourceProcessor; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SequenceNameFunction; @@ -35,6 +41,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskIn import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeIOConfig; @@ -46,10 +53,12 @@ import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.SegmentAllocator; import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; import org.apache.druid.timeline.DataSegment; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -64,6 +73,12 @@ abstract class PartialSegmentGenerateTask e private final String supervisorTaskId; private final IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder; + @MonotonicNonNull + private RowIngestionMeters buildSegmentsMeters; + + @MonotonicNonNull + private ParseExceptionHandler parseExceptionHandler; + PartialSegmentGenerateTask( String id, String groupId, @@ -113,7 +128,10 @@ abstract class PartialSegmentGenerateTask e inputSource, toolbox.getIndexingTmpDir() ); - taskClient.report(supervisorTaskId, createGeneratedPartitionsReport(toolbox, segments)); + + Map taskReport = getTaskCompletionReports(); + + taskClient.report(supervisorTaskId, createGeneratedPartitionsReport(toolbox, segments, taskReport)); return TaskStatus.success(getId()); } @@ -131,7 +149,8 @@ abstract class PartialSegmentGenerateTask e */ abstract T createGeneratedPartitionsReport( TaskToolbox toolbox, - List segments + List segments, + Map taskReport ); private List generateSegments( @@ -148,7 +167,7 @@ abstract class PartialSegmentGenerateTask e null ); final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); - final RowIngestionMeters buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters(); + buildSegmentsMeters = toolbox.getRowIngestionMetersFactory().createRowIngestionMeters(); toolbox.addMonitor( new RealtimeMetricsMonitor( @@ -164,7 +183,7 @@ abstract class PartialSegmentGenerateTask e final SegmentAllocatorForBatch segmentAllocator = createSegmentAllocator(toolbox, taskClient); final SequenceNameFunction sequenceNameFunction = segmentAllocator.getSequenceNameFunction(); - final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler( + parseExceptionHandler = new ParseExceptionHandler( buildSegmentsMeters, tuningConfig.isLogParseExceptions(), tuningConfig.getMaxParseExceptions(), @@ -203,7 +222,6 @@ abstract class PartialSegmentGenerateTask e parseExceptionHandler, pushTimeout ); - return pushed.getSegments(); } catch (Exception e) { @@ -218,4 +236,50 @@ abstract class PartialSegmentGenerateTask e } } } + + /** + * Generate an IngestionStatsAndErrorsTaskReport for the task. + */ + private Map getTaskCompletionReports() + { + return TaskReport.buildTaskReports( + new IngestionStatsAndErrorsTaskReport( + getId(), + new IngestionStatsAndErrorsTaskReportData( + IngestionState.COMPLETED, + getTaskCompletionUnparseableEvents(), + getTaskCompletionRowStats(), + "", + false, // not applicable for parallel subtask + segmentAvailabilityWaitTimeMs + ) + ) + ); + } + + private Map getTaskCompletionUnparseableEvents() + { + Map unparseableEventsMap = new HashMap<>(); + List parseExceptionMessages = IndexTaskUtils.getReportListFromSavedParseExceptions( + parseExceptionHandler.getSavedParseExceptionReports() + ); + + if (parseExceptionMessages != null) { + unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, parseExceptionMessages); + } else { + unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, ImmutableList.of()); + } + + return unparseableEventsMap; + } + + private Map getTaskCompletionRowStats() + { + Map metrics = new HashMap<>(); + metrics.put( + RowIngestionMeters.BUILD_SEGMENTS, + buildSegmentsMeters.getTotals() + ); + return metrics; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index e55e23b7eb5..d82e1f8c571 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -149,7 +149,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn boolean appendToExisting ) { - final ParallelIndexSupervisorTask task = newTask( + final ParallelIndexSupervisorTask task = createTask( timestampSpec, dimensionsSpec, inputFormat, @@ -165,15 +165,26 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn return runTask(task, expectedTaskStatus); } - Set runTask(Task task, TaskState expectedTaskStatus) + void runTaskAndVerifyStatus(Task task, TaskState expectedTaskStatus) { task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK); TaskStatus taskStatus = getIndexingServiceClient().runAndWait(task); Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode()); + } + + Set runTask(Task task, TaskState expectedTaskStatus) + { + runTaskAndVerifyStatus(task, expectedTaskStatus); return getIndexingServiceClient().getPublishedSegments(task); } - protected ParallelIndexSupervisorTask newTask( + Map runTaskAndGetReports(Task task, TaskState expectedTaskStatus) + { + runTaskAndVerifyStatus(task, expectedTaskStatus); + return getIndexingServiceClient().getTaskReport(task.getId()); + } + + protected ParallelIndexSupervisorTask createTask( @Nullable TimestampSpec timestampSpec, @Nullable DimensionsSpec dimensionsSpec, @Nullable InputFormat inputFormat, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index d58ce66016a..b77d9f6ae0a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -42,6 +42,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -78,7 +79,9 @@ import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalDataSegmentPusher; @@ -101,6 +104,7 @@ import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TemporaryFolder; @@ -111,6 +115,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -125,6 +130,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase { @@ -297,7 +303,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase null, null, null, - null, + 5, null, null, null @@ -543,6 +549,16 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase return taskRunner.run(injectIfNeeded(task)); } + @Override + public Map getTaskReport(String taskId) + { + final Optional task = getTaskStorage().getTask(taskId); + if (!task.isPresent()) { + return null; + } + return ((ParallelIndexSupervisorTask) task.get()).doGetLiveReports("full"); + } + public TaskContainer getTaskContainer(String taskId) { return taskRunner.getTaskContainer(taskId); @@ -791,6 +807,111 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase } } + protected Map buildExpectedTaskReportSequential( + String taskId, + List expectedUnparseableEvents, + RowIngestionMetersTotals expectedDeterminePartitions, + RowIngestionMetersTotals expectedTotals + ) + { + final Map payload = new HashMap<>(); + + payload.put("ingestionState", IngestionState.COMPLETED); + payload.put( + "unparseableEvents", + ImmutableMap.of("determinePartitions", ImmutableList.of(), "buildSegments", expectedUnparseableEvents) + ); + Map emptyAverageMinuteMap = ImmutableMap.of( + "processed", 0.0, + "unparseable", 0.0, + "thrownAway", 0.0, + "processedWithError", 0.0 + ); + + Map emptyAverages = ImmutableMap.of( + "1m", emptyAverageMinuteMap, + "5m", emptyAverageMinuteMap, + "15m", emptyAverageMinuteMap + ); + + payload.put( + "rowStats", + ImmutableMap.of( + "movingAverages", + ImmutableMap.of("determinePartitions", emptyAverages, "buildSegments", emptyAverages), + "totals", + ImmutableMap.of("determinePartitions", expectedDeterminePartitions, "buildSegments", expectedTotals) + ) + ); + + final Map ingestionStatsAndErrors = new HashMap<>(); + ingestionStatsAndErrors.put("taskId", taskId); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + return Collections.singletonMap("ingestionStatsAndErrors", ingestionStatsAndErrors); + } + + protected Map buildExpectedTaskReportParallel( + String taskId, + List expectedUnparseableEvents, + RowIngestionMetersTotals expectedTotals + ) + { + Map returnMap = new HashMap<>(); + Map ingestionStatsAndErrors = new HashMap<>(); + Map payload = new HashMap<>(); + + payload.put("ingestionState", IngestionState.COMPLETED); + payload.put("unparseableEvents", ImmutableMap.of("buildSegments", expectedUnparseableEvents)); + payload.put("rowStats", ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals))); + + ingestionStatsAndErrors.put("taskId", taskId); + ingestionStatsAndErrors.put("payload", payload); + ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); + + returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); + return returnMap; + } + + protected void compareTaskReports( + Map expectedReports, + Map actualReports + ) + { + expectedReports = (Map) expectedReports.get("ingestionStatsAndErrors"); + actualReports = (Map) actualReports.get("ingestionStatsAndErrors"); + + Assert.assertEquals(expectedReports.get("taskId"), actualReports.get("taskId")); + Assert.assertEquals(expectedReports.get("type"), actualReports.get("type")); + + Map expectedPayload = (Map) expectedReports.get("payload"); + Map actualPayload = (Map) actualReports.get("payload"); + Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState")); + Assert.assertEquals(expectedPayload.get("rowStats"), actualPayload.get("rowStats")); + Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState")); + + List expectedParseExceptionReports = + (List) ((Map) + expectedPayload.get("unparseableEvents")).get("buildSegments"); + + List actualParseExceptionReports = + (List) ((Map) + actualPayload.get("unparseableEvents")).get("buildSegments"); + + List expectedMessages = expectedParseExceptionReports + .stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList()); + List actualMessages = actualParseExceptionReports + .stream().map(r -> r.getDetails().get(0)).collect(Collectors.toList()); + Assert.assertEquals(expectedMessages, actualMessages); + + List expectedInputs = expectedParseExceptionReports + .stream().map(ParseExceptionReport::getInput).collect(Collectors.toList()); + List actualInputs = actualParseExceptionReports + .stream().map(ParseExceptionReport::getInput).collect(Collectors.toList()); + Assert.assertEquals(expectedInputs, actualInputs); + } + static class LocalParallelIndexTaskClientFactory implements IndexTaskClientFactory { private final ConcurrentMap tasks; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java index 8152aa4dbda..f95a8d01b08 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.HashPartitionFunction; @@ -161,15 +162,14 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh public void testRun() throws Exception { final Integer maxRowsPerSegment = numShards == null ? 10 : null; - final Set publishedSegments = runTestTask( + final Set publishedSegments = runTask(createTask( new HashedPartitionsSpec( maxRowsPerSegment, numShards, ImmutableList.of("dim1", "dim2") ), - TaskState.SUCCESS, false - ); + ), TaskState.SUCCESS); final Map expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments( maxRowsPerSegment, @@ -182,16 +182,14 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh public void testRunWithHashPartitionFunction() throws Exception { final Integer maxRowsPerSegment = numShards == null ? 10 : null; - final Set publishedSegments = runTestTask( + final Set publishedSegments = runTask(createTask( new HashedPartitionsSpec( maxRowsPerSegment, numShards, ImmutableList.of("dim1", "dim2"), HashPartitionFunction.MURMUR3_32_ABS ), - TaskState.SUCCESS, - false - ); + false), TaskState.SUCCESS); final Map expectedIntervalToNumSegments = computeExpectedIntervalToNumSegments( maxRowsPerSegment, numShards @@ -199,6 +197,39 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh assertHashedPartition(publishedSegments, expectedIntervalToNumSegments); } + @Test + public void testRowStats() + { + final Integer maxRowsPerSegment = numShards == null ? 10 : null; + ParallelIndexSupervisorTask task = createTask( + new HashedPartitionsSpec( + maxRowsPerSegment, + numShards, + ImmutableList.of("dim1", "dim2"), + HashPartitionFunction.MURMUR3_32_ABS), + false); + RowIngestionMetersTotals expectedTotals = new RowIngestionMetersTotals(200, 0, 0, 0); + Map expectedReports; + if (maxNumConcurrentSubTasks <= 1) { + expectedReports = buildExpectedTaskReportSequential( + task.getId(), + ImmutableList.of(), + numShards == null ? expectedTotals : new RowIngestionMetersTotals(0, 0, 0, 0), + expectedTotals + ); + } else { + // when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses the single phase runner + // instead of sequential runner + expectedReports = buildExpectedTaskReportParallel( + task.getId(), + ImmutableList.of(), + expectedTotals + ); + } + Map actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); + compareTaskReports(expectedReports, actualReports); + } + private Map computeExpectedIntervalToNumSegments( @Nullable Integer maxRowsPerSegment, @Nullable Integer numShards @@ -224,27 +255,26 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh { final Set publishedSegments = new HashSet<>(); publishedSegments.addAll( - runTestTask( - new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1", "dim2")), - TaskState.SUCCESS, - false - ) + runTask( + createTask( + new HashedPartitionsSpec(null, numShards, ImmutableList.of("dim1", "dim2")), + false), + TaskState.SUCCESS) ); // Append publishedSegments.addAll( - runTestTask( - new DynamicPartitionsSpec(5, null), - TaskState.SUCCESS, - true - ) - ); + runTask( + createTask( + new DynamicPartitionsSpec(5, null), + true), + TaskState.SUCCESS)); // And append again publishedSegments.addAll( - runTestTask( - new DynamicPartitionsSpec(10, null), - TaskState.SUCCESS, - true - ) + runTask( + createTask( + new DynamicPartitionsSpec(10, null), + true), + TaskState.SUCCESS) ); final Map> intervalToSegments = new HashMap<>(); @@ -275,14 +305,13 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh } } - private Set runTestTask( + private ParallelIndexSupervisorTask createTask( PartitionsSpec partitionsSpec, - TaskState expectedTaskState, boolean appendToExisting ) { if (isUseInputFormatApi()) { - return runTestTask( + return createTask( TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, @@ -292,11 +321,10 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh "test_*", partitionsSpec, maxNumConcurrentSubTasks, - expectedTaskState, appendToExisting ); } else { - return runTestTask( + return createTask( null, null, null, @@ -306,7 +334,6 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh "test_*", partitionsSpec, maxNumConcurrentSubTasks, - expectedTaskState, appendToExisting ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 109d3ab5b3c..41e9c676d39 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -355,13 +355,13 @@ public class ParallelIndexSupervisorTaskTest createRangePartitionStat(day2, 7), createRangePartitionStat(day1, 0), createRangePartitionStat(day2, 1) - ))); + ), null)); taskIdToReport.put(task2, new GeneratedPartitionsReport(task2, Arrays.asList( createRangePartitionStat(day1, 4), createRangePartitionStat(day1, 6), createRangePartitionStat(day2, 1), createRangePartitionStat(day1, 1) - ))); + ), null)); Map> partitionToLocations = ParallelIndexSupervisorTask.getPartitionToLocations(taskIdToReport); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java index f5b76e52ded..c9cb2136d1e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java @@ -41,6 +41,7 @@ import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.DimensionRangeShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -225,22 +226,39 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP public void createsCorrectRangePartitions() throws Exception { int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION; - final Set publishedSegments = runTestTask( + final Set publishedSegments = runTask(runTestTask( new DimensionRangePartitionsSpec( targetRowsPerSegment, null, Collections.singletonList(DIM1), false ), - useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS, false - ); + ), useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS); if (!useMultivalueDim) { assertRangePartitions(publishedSegments); } } + @Test + public void testRowStats() + { + if (useMultivalueDim) { + return; + } + final int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION; + ParallelIndexSupervisorTask task = runTestTask( + new SingleDimensionPartitionsSpec(targetRowsPerSegment, null, DIM1, false), + false); + Map expectedReports = buildExpectedTaskReportParallel( + task.getId(), + ImmutableList.of(), + new RowIngestionMetersTotals(600, 0, 0, 0)); + Map actualReports = runTaskAndGetReports(task, TaskState.SUCCESS); + compareTaskReports(expectedReports, actualReports); + } + @Test public void testAppendLinearlyPartitionedSegmentsToHashPartitionedDatasourceSuccessfullyAppend() { @@ -250,32 +268,29 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP final int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION; final Set publishedSegments = new HashSet<>(); publishedSegments.addAll( - runTestTask( + runTask(runTestTask( new SingleDimensionPartitionsSpec( targetRowsPerSegment, null, DIM1, false ), - TaskState.SUCCESS, false - ) + ), TaskState.SUCCESS) ); // Append publishedSegments.addAll( - runTestTask( + runTask(runTestTask( new DynamicPartitionsSpec(5, null), - TaskState.SUCCESS, true - ) + ), TaskState.SUCCESS) ); // And append again publishedSegments.addAll( - runTestTask( + runTask(runTestTask( new DynamicPartitionsSpec(10, null), - TaskState.SUCCESS, true - ) + ), TaskState.SUCCESS) ); final Map> intervalToSegments = new HashMap<>(); @@ -306,14 +321,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP } } - private Set runTestTask( + private ParallelIndexSupervisorTask runTestTask( PartitionsSpec partitionsSpec, - TaskState expectedTaskState, boolean appendToExisting ) { if (isUseInputFormatApi()) { - return runTestTask( + return createTask( TIMESTAMP_SPEC, DIMENSIONS_SPEC, INPUT_FORMAT, @@ -323,11 +337,10 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP TEST_FILE_NAME_PREFIX + "*", partitionsSpec, maxNumConcurrentSubTasks, - expectedTaskState, appendToExisting ); } else { - return runTestTask( + return createTask( null, null, null, @@ -337,7 +350,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP TEST_FILE_NAME_PREFIX + "*", partitionsSpec, maxNumConcurrentSubTasks, - expectedTaskState, appendToExisting ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index b57e55b1181..3a4bcd0a099 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -27,7 +27,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.StringInputRowParser; -import org.apache.druid.indexer.IngestionState; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; @@ -71,14 +70,12 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; @RunWith(Parameterized.class) public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest @@ -335,7 +332,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv Collections.emptyList() ); Map actualReports = task.doGetLiveReports("full"); - Map expectedReports = getExpectedTaskReportParallel( + Map expectedReports = buildExpectedTaskReportParallel( task.getId(), ImmutableList.of( new ParseExceptionReport( @@ -360,68 +357,6 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv compareTaskReports(expectedReports, actualReports); } - private void compareTaskReports( - Map expectedReports, - Map actualReports - ) - { - expectedReports = (Map) expectedReports.get("ingestionStatsAndErrors"); - actualReports = (Map) actualReports.get("ingestionStatsAndErrors"); - - Assert.assertEquals(expectedReports.get("taskId"), actualReports.get("taskId")); - Assert.assertEquals(expectedReports.get("type"), actualReports.get("type")); - - Map expectedPayload = (Map) expectedReports.get("payload"); - Map actualPayload = (Map) actualReports.get("payload"); - Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState")); - Assert.assertEquals(expectedPayload.get("rowStats"), actualPayload.get("rowStats")); - Assert.assertEquals(expectedPayload.get("ingestionState"), actualPayload.get("ingestionState")); - - List expectedParseExceptionReports = (List) ((Map) expectedPayload.get("unparseableEvents")) - .get("buildSegments"); - - List actualParseExceptionReports = (List) ((Map) actualPayload.get("unparseableEvents")) - .get("buildSegments"); - - List expectedMessages = expectedParseExceptionReports.stream().map((r) -> { - return r.getDetails().get(0); - }).collect(Collectors.toList()); - List actualMessages = actualParseExceptionReports.stream().map((r) -> { - return r.getDetails().get(0); - }).collect(Collectors.toList()); - Assert.assertEquals(expectedMessages, actualMessages); - - List expectedInputs = expectedParseExceptionReports.stream().map((r) -> { - return r.getInput(); - }).collect(Collectors.toList()); - List actualInputs = actualParseExceptionReports.stream().map((r) -> { - return r.getInput(); - }).collect(Collectors.toList()); - Assert.assertEquals(expectedInputs, actualInputs); - } - - private Map getExpectedTaskReportParallel( - String taskId, - List expectedUnparseableEvents, - RowIngestionMetersTotals expectedTotals - ) - { - Map returnMap = new HashMap<>(); - Map ingestionStatsAndErrors = new HashMap<>(); - Map payload = new HashMap<>(); - - payload.put("ingestionState", IngestionState.COMPLETED); - payload.put("unparseableEvents", ImmutableMap.of("buildSegments", expectedUnparseableEvents)); - payload.put("rowStats", ImmutableMap.of("totals", ImmutableMap.of("buildSegments", expectedTotals))); - - ingestionStatsAndErrors.put("taskId", taskId); - ingestionStatsAndErrors.put("payload", payload); - ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); - - returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); - return returnMap; - } - // // Ingest all data. @@ -475,15 +410,16 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv Map expectedReports; if (useInputFormatApi) { - expectedReports = getExpectedTaskReportSequential( + expectedReports = buildExpectedTaskReportSequential( task.getId(), expectedUnparseableEvents, + new RowIngestionMetersTotals(0, 0, 0, 0), expectedTotals ); } else { // when useInputFormatApi is false, maxConcurrentSubTasks=2 and it uses the single phase runner // instead of sequential runner - expectedReports = getExpectedTaskReportParallel( + expectedReports = buildExpectedTaskReportParallel( task.getId(), expectedUnparseableEvents, expectedTotals @@ -494,65 +430,6 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv System.out.println(actualReports); } - private Map getExpectedTaskReportSequential( - String taskId, - List expectedUnparseableEvents, - RowIngestionMetersTotals expectedTotals - ) - { - Map returnMap = new HashMap<>(); - Map ingestionStatsAndErrors = new HashMap<>(); - Map payload = new HashMap<>(); - - payload.put("ingestionState", IngestionState.COMPLETED); - payload.put( - "unparseableEvents", - ImmutableMap.of( - "determinePartitions", ImmutableList.of(), - "buildSegments", expectedUnparseableEvents - ) - ); - Map emptyAverageMinuteMap = ImmutableMap.of( - "processed", 0.0, - "unparseable", 0.0, - "thrownAway", 0.0, - "processedWithError", 0.0 - ); - - Map emptyAverages = ImmutableMap.of( - "1m", emptyAverageMinuteMap, - "5m", emptyAverageMinuteMap, - "15m", emptyAverageMinuteMap - ); - - payload.put( - "rowStats", - ImmutableMap.of( - "movingAverages", - ImmutableMap.of( - "determinePartitions", - emptyAverages, - "buildSegments", - emptyAverages - ), - "totals", - ImmutableMap.of( - "determinePartitions", - new RowIngestionMetersTotals(0, 0, 0, 0), - "buildSegments", - expectedTotals - ) - ) - ); - - ingestionStatsAndErrors.put("taskId", taskId); - ingestionStatsAndErrors.put("payload", payload); - ingestionStatsAndErrors.put("type", "ingestionStatsAndErrors"); - - returnMap.put("ingestionStatsAndErrors", ingestionStatsAndErrors); - return returnMap; - } - @Test public void testPublishEmptySegments() { diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/MutableRowIngestionMeters.java b/processing/src/main/java/org/apache/druid/segment/incremental/MutableRowIngestionMeters.java new file mode 100644 index 00000000000..0888c19c594 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/incremental/MutableRowIngestionMeters.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import java.util.Map; + +public class MutableRowIngestionMeters implements RowIngestionMeters +{ + private long processed; + private long processedWithError; + private long unparseable; + private long thrownAway; + + public MutableRowIngestionMeters() + { + this.processed = 0; + this.processedWithError = 0; + this.unparseable = 0; + this.thrownAway = 0; + } + + @Override + public long getProcessed() + { + return processed; + } + + @Override + public void incrementProcessed() + { + processed++; + } + + @Override + public long getProcessedWithError() + { + return processedWithError; + } + + @Override + public void incrementProcessedWithError() + { + processedWithError++; + } + + @Override + public long getUnparseable() + { + return unparseable; + } + + @Override + public void incrementUnparseable() + { + unparseable++; + } + + @Override + public long getThrownAway() + { + return thrownAway; + } + + @Override + public void incrementThrownAway() + { + thrownAway++; + } + + @Override + public RowIngestionMetersTotals getTotals() + { + return new RowIngestionMetersTotals(processed, processedWithError, thrownAway, unparseable); + } + + @Override + public Map getMovingAverages() + { + throw new UnsupportedOperationException(); + } + + public void addRowIngestionMetersTotals(RowIngestionMetersTotals rowIngestionMetersTotals) + { + this.processed += rowIngestionMetersTotals.getProcessed(); + this.processedWithError += rowIngestionMetersTotals.getProcessedWithError(); + this.unparseable += rowIngestionMetersTotals.getUnparseable(); + this.thrownAway += rowIngestionMetersTotals.getThrownAway(); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/MutableRowIngestionMetersTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/MutableRowIngestionMetersTest.java new file mode 100644 index 00000000000..5d32c5994b2 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/incremental/MutableRowIngestionMetersTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import org.junit.Assert; +import org.junit.Test; + +public class MutableRowIngestionMetersTest +{ + @Test + public void testIncrement() + { + MutableRowIngestionMeters mutableRowIngestionMeters = new MutableRowIngestionMeters(); + mutableRowIngestionMeters.incrementProcessed(); + mutableRowIngestionMeters.incrementProcessedWithError(); + mutableRowIngestionMeters.incrementUnparseable(); + mutableRowIngestionMeters.incrementThrownAway(); + Assert.assertEquals(mutableRowIngestionMeters.getTotals(), new RowIngestionMetersTotals(1, 1, 1, 1)); + } + + @Test + public void testAddRowIngestionMetersTotals() + { + MutableRowIngestionMeters mutableRowIngestionMeters = new MutableRowIngestionMeters(); + RowIngestionMetersTotals rowIngestionMetersTotals = new RowIngestionMetersTotals(10, 1, 0, 1); + mutableRowIngestionMeters.addRowIngestionMetersTotals(rowIngestionMetersTotals); + Assert.assertEquals(mutableRowIngestionMeters.getTotals(), rowIngestionMetersTotals); + } +}