diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 5afbadb3d43..4a059a75c63 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -67,12 +67,14 @@ An example output is shown below: "rowStats": { "determinePartitions": { "processed": 0, + "processedBytes": 0, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 }, "buildSegments": { "processed": 5390324, + "processedBytes": 5109573212, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 @@ -118,18 +120,21 @@ An example output is shown below: "buildSegments": { "5m": { "processed": 3.392158326408501, + "processedBytes": 627.5492903856, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "15m": { "processed": 1.736165476881023, + "processedBytes": 321.1906130223, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "1m": { "processed": 4.206417693750045, + "processedBytes": 778.1872733438, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 @@ -139,6 +144,7 @@ An example output is shown below: "totals": { "buildSegments": { "processed": 1994, + "processedBytes": 3425110, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 @@ -168,6 +174,7 @@ Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as tho the `rowStats` map contains information about row counts. There is one entry for each ingestion phase. The definitions of the different row counts are shown below: - `processed`: Number of rows successfully ingested without parsing errors +- `processedBytes`: Total number of uncompressed bytes processed by the task. This reports the total byte size of all rows i.e. even those that are included in `processedWithError`, `unparseable` or `thrownAway`. - `processedWithError`: Number of rows that were ingested, but contained a parsing error within one or more columns. This typically occurs where input rows have a parseable structure but invalid types for columns, such as passing in a non-numeric String value for a numeric column. - `thrownAway`: Number of rows skipped. This includes rows with timestamps that were outside of the ingestion task's defined time interval and rows that were filtered out with a [`transformSpec`](./ingestion-spec.md#transformspec), but doesn't include the rows skipped by explicit user configurations. For example, the rows skipped by `skipHeaderRows` or `hasHeaderRow` in the CSV format are not counted. - `unparseable`: Number of rows that could not be parsed at all and were discarded. This tracks input rows without a parseable structure, such as passing in non-JSON data when using a JSON parser. @@ -188,24 +195,27 @@ http://:/druid/worker/v1/chat//rowStat An example report is shown below. The `movingAverages` section contains 1 minute, 5 minute, and 15 minute moving averages of increases to the four row counters, which have the same definitions as those in the completion report. The `totals` section shows the current totals. -``` +```json { "movingAverages": { "buildSegments": { "5m": { "processed": 3.392158326408501, + "processedBytes": 627.5492903856, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "15m": { "processed": 1.736165476881023, + "processedBytes": 321.1906130223, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "1m": { "processed": 4.206417693750045, + "processedBytes": 778.1872733438, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 @@ -215,6 +225,7 @@ An example report is shown below. The `movingAverages` section contains 1 minute "totals": { "buildSegments": { "processed": 1994, + "processedBytes": 3425110, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java index 461b066ce9d..3578fa21a6e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java @@ -58,6 +58,11 @@ public class ChannelCounters implements QueryCounter add(NO_PARTITION, 1, 0, 0, 0); } + public void incrementBytes(long bytes) + { + add(NO_PARTITION, 0, bytes, 0, 0); + } + public void incrementFileCount() { add(NO_PARTITION, 0, 0, 0, 1); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index 2c51f74c7c8..fe4e0b5a62b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.TimestampSpec; @@ -52,6 +53,7 @@ import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.segment.RowAdapters; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; import org.apache.druid.timeline.SegmentId; import java.io.File; @@ -140,6 +142,7 @@ public class ExternalInputSliceReader implements InputSliceReader final InputSourceReader reader; final boolean incrementCounters = isFileBasedInputSource(inputSource); + final InputStats inputStats = new SimpleRowIngestionMeters(); if (incrementCounters) { reader = new CountableInputSourceReader( inputSource.reader(schema, inputFormat, temporaryDirectory), @@ -159,7 +162,7 @@ public class ExternalInputSliceReader implements InputSliceReader public CloseableIterator make() { try { - CloseableIterator baseIterator = reader.read(); + CloseableIterator baseIterator = reader.read(inputStats); return new CloseableIterator() { private InputRow next = null; @@ -216,6 +219,7 @@ public class ExternalInputSliceReader implements InputSliceReader // has one file. if (incrementCounters) { channelCounters.incrementFileCount(); + channelCounters.incrementBytes(inputStats.getProcessedBytes()); } } catch (IOException e) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index cf4e4052d35..1790bb1137e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault; import org.apache.druid.msq.indexing.error.RowTooLargeFault; +import org.apache.druid.msq.test.CounterSnapshotBuilder; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -106,6 +107,12 @@ public class MSQInsertTest extends MSQTestBase 0 ))) .setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L})) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 49bb7f6d090..1cd8d66bbcb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -22,6 +22,7 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.msq.test.CounterSnapshotBuilder; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.segment.column.ColumnType; @@ -138,6 +139,12 @@ public class MSQReplaceTest extends MSQTestBase new Object[]{1466992800000L, 6L} ) ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } @@ -176,6 +183,12 @@ public class MSQReplaceTest extends MSQTestBase new Object[]{1466989200000L, "Kolega2357"} ) ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index b6305e13542..04d3a6a30c2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -33,6 +33,7 @@ import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.shuffle.DurableStorageUtils; +import org.apache.druid.msq.test.CounterSnapshotBuilder; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.query.InlineDataSource; @@ -794,7 +795,14 @@ public class MSQSelectTest extends MSQTestBase ) )) .tuningConfig(MSQTuningConfig.defaultConfig()) - .build()) + .build() + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java new file mode 100644 index 00000000000..0d89c0734ff --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.counters.QueryCounterSnapshot; + +/** + * Utility class to build instances of {@link QueryCounterSnapshot} used in tests. + */ +public class CounterSnapshotBuilder +{ + private long[] rows; + private long[] bytes; + private long[] frames; + private long[] files; + private long[] totalFiles; + + public static CounterSnapshotBuilder with() + { + return new CounterSnapshotBuilder(); + } + + public CounterSnapshotBuilder rows(long... rows) + { + this.rows = rows; + return this; + } + + public CounterSnapshotBuilder bytes(long... bytes) + { + this.bytes = bytes; + return this; + } + + public CounterSnapshotBuilder frames(long... frames) + { + this.frames = frames; + return this; + } + + public CounterSnapshotBuilder files(long... files) + { + this.files = files; + return this; + } + + public CounterSnapshotBuilder totalFiles(long... totalFiles) + { + this.totalFiles = totalFiles; + return this; + } + + public QueryCounterSnapshot buildChannelCounter() + { + return new ChannelCounters.Snapshot(rows, bytes, frames, files, totalFiles); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 43ae5e0b0c9..5dfe528fbaf 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -69,6 +69,9 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.input.InputSourceModule; +import org.apache.druid.msq.counters.CounterSnapshots; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.counters.QueryCounterSnapshot; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.guice.MSQDurableStorageModule; @@ -168,7 +171,6 @@ import org.mockito.Mockito; import javax.annotation.Nonnull; import javax.annotation.Nullable; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -176,6 +178,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -593,14 +596,7 @@ public class MSQTestBase extends BaseCalciteQueryTest }; segmentManager.addSegment(segment); } - return new Supplier>() - { - @Override - public Pair get() - { - return new Pair<>(segmentManager.getSegment(segmentId), Closer.create()); - } - }; + return () -> new Pair<>(segmentManager.getSegment(segmentId), Closer.create()); } public SelectTester testSelectQuery() @@ -745,7 +741,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } } - public abstract class MSQTester> + public abstract class MSQTester> { protected String sql = null; protected Map queryContext = DEFAULT_MSQ_CONTEXT; @@ -758,78 +754,90 @@ public class MSQTestBase extends BaseCalciteQueryTest protected Matcher expectedExecutionErrorMatcher = null; protected MSQFault expectedMSQFault = null; protected Class expectedMSQFaultClass = null; + protected final Map>> + expectedStageWorkerChannelToCounters = new HashMap<>(); private boolean hasRun = false; - @SuppressWarnings("unchecked") public Builder setSql(String sql) { this.sql = sql; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setQueryContext(Map queryContext) { this.queryContext = queryContext; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedRowSignature(RowSignature expectedRowSignature) { Preconditions.checkArgument(!expectedRowSignature.equals(RowSignature.empty()), "Row signature cannot be empty"); this.expectedRowSignature = expectedRowSignature; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedSegment(Set expectedSegments) { Preconditions.checkArgument(!expectedSegments.isEmpty(), "Segments cannot be empty"); this.expectedSegments = expectedSegments; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedResultRows(List expectedResultRows) { Preconditions.checkArgument(expectedResultRows.size() > 0, "Results rows cannot be empty"); this.expectedResultRows = expectedResultRows; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec) { this.expectedMSQSpec = expectedMSQSpec; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedValidationErrorMatcher(Matcher expectedValidationErrorMatcher) { this.expectedValidationErrorMatcher = expectedValidationErrorMatcher; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedExecutionErrorMatcher(Matcher expectedExecutionErrorMatcher) { this.expectedExecutionErrorMatcher = expectedExecutionErrorMatcher; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedMSQFault(MSQFault MSQFault) { this.expectedMSQFault = MSQFault; - return (Builder) this; + return asBuilder(); } public Builder setExpectedMSQFaultClass(Class expectedMSQFaultClass) { this.expectedMSQFaultClass = expectedMSQFaultClass; + return asBuilder(); + } + + public Builder setExpectedCountersForStageWorkerChannel( + QueryCounterSnapshot counterSnapshot, + int stage, + int worker, + String channel + ) + { + this.expectedStageWorkerChannelToCounters.computeIfAbsent(stage, s -> new HashMap<>()) + .computeIfAbsent(worker, w -> new HashMap<>()) + .put(channel, counterSnapshot); + return asBuilder(); + } + + @SuppressWarnings("unchecked") + private Builder asBuilder() + { return (Builder) this; } @@ -847,6 +855,39 @@ public class MSQTestBase extends BaseCalciteQueryTest MatcherAssert.assertThat(e, expectedValidationErrorMatcher); } + protected void verifyCounters(CounterSnapshotsTree counterSnapshotsTree) + { + Assert.assertNotNull(counterSnapshotsTree); + + final Map> stageWorkerToSnapshots = counterSnapshotsTree.copyMap(); + expectedStageWorkerChannelToCounters.forEach((stage, expectedWorkerChannelToCounters) -> { + final Map workerToCounters = stageWorkerToSnapshots.get(stage); + Assert.assertNotNull("No counters for stage " + stage, workerToCounters); + + expectedWorkerChannelToCounters.forEach((worker, expectedChannelToCounters) -> { + CounterSnapshots counters = workerToCounters.get(worker); + Assert.assertNotNull( + StringUtils.format("No counters for stage [%d], worker [%d]", stage, worker), + counters + ); + + final Map channelToCounters = counters.getMap(); + expectedChannelToCounters.forEach( + (channel, counter) -> Assert.assertEquals( + StringUtils.format( + "Counter mismatch for stage [%d], worker [%d], channel [%s]", + stage, + worker, + channel + ), + counter, + channelToCounters.get(channel) + ) + ); + }); + }); + } + protected void readyToRun() { if (!hasRun) { @@ -943,7 +984,9 @@ public class MSQTestBase extends BaseCalciteQueryTest return; } - getPayloadOrThrow(controllerId); + MSQTaskReportPayload reportPayload = getPayloadOrThrow(controllerId); + verifyCounters(reportPayload.getCounters()); + MSQSpec foundSpec = indexingServiceClient.getQuerySpecForTask(controllerId); log.info( "found generated segments: %s", @@ -1112,6 +1155,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } MSQTaskReportPayload payload = getPayloadOrThrow(controllerId); + verifyCounters(payload.getCounters()); if (payload.getStatus().getErrorReport() != null) { throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString()); @@ -1163,4 +1207,5 @@ public class MSQTestBase extends BaseCalciteQueryTest } } } + }