From d6949b1b79df67e6a031ceb074461585e48b1d27 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 16 Dec 2022 02:20:01 +0530 Subject: [PATCH] Track input processedBytes with MSQ ingestion (#13559) Follow up to #13520 Bytes processed are currently tracked for intermediate stages in MSQ ingestion. This patch adds the capability to track the bytes processed by an MSQ controller task while reading from an external input source or a segment source. Changes: - Track `processedBytes` for every `InputSource` read in `ExternalInputSliceReader` - Update `ChannelCounters` with the above obtained `processedBytes` when incrementing the input file count. - Update task report structure in docs The total input processed bytes can be obtained by summing the `processedBytes` as follows: totalBytes = 0 for every root stage (i.e. a stage which does not have another stage as an input): for every worker in that stage: for every input channel: (i.e. channels with prefix "input", e.g. "input0", "input1", etc.) totalBytes += processedBytes --- docs/ingestion/tasks.md | 13 ++- .../druid/msq/counters/ChannelCounters.java | 5 + .../external/ExternalInputSliceReader.java | 6 +- .../apache/druid/msq/exec/MSQInsertTest.java | 7 ++ .../apache/druid/msq/exec/MSQReplaceTest.java | 13 +++ .../apache/druid/msq/exec/MSQSelectTest.java | 10 +- .../msq/test/CounterSnapshotBuilder.java | 75 +++++++++++++ .../apache/druid/msq/test/MSQTestBase.java | 103 +++++++++++++----- 8 files changed, 200 insertions(+), 32 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index 5afbadb3d43..4a059a75c63 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -67,12 +67,14 @@ An example output is shown below: "rowStats": { "determinePartitions": { "processed": 0, + "processedBytes": 0, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 }, "buildSegments": { "processed": 5390324, + "processedBytes": 5109573212, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 @@ -118,18 +120,21 @@ An example output is shown below: "buildSegments": { "5m": { "processed": 3.392158326408501, + "processedBytes": 627.5492903856, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "15m": { "processed": 1.736165476881023, + "processedBytes": 321.1906130223, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "1m": { "processed": 4.206417693750045, + "processedBytes": 778.1872733438, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 @@ -139,6 +144,7 @@ An example output is shown below: "totals": { "buildSegments": { "processed": 1994, + "processedBytes": 3425110, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 @@ -168,6 +174,7 @@ Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as tho the `rowStats` map contains information about row counts. There is one entry for each ingestion phase. The definitions of the different row counts are shown below: - `processed`: Number of rows successfully ingested without parsing errors +- `processedBytes`: Total number of uncompressed bytes processed by the task. This reports the total byte size of all rows i.e. even those that are included in `processedWithError`, `unparseable` or `thrownAway`. - `processedWithError`: Number of rows that were ingested, but contained a parsing error within one or more columns. This typically occurs where input rows have a parseable structure but invalid types for columns, such as passing in a non-numeric String value for a numeric column. - `thrownAway`: Number of rows skipped. This includes rows with timestamps that were outside of the ingestion task's defined time interval and rows that were filtered out with a [`transformSpec`](./ingestion-spec.md#transformspec), but doesn't include the rows skipped by explicit user configurations. For example, the rows skipped by `skipHeaderRows` or `hasHeaderRow` in the CSV format are not counted. - `unparseable`: Number of rows that could not be parsed at all and were discarded. This tracks input rows without a parseable structure, such as passing in non-JSON data when using a JSON parser. @@ -188,24 +195,27 @@ http://:/druid/worker/v1/chat//rowStat An example report is shown below. The `movingAverages` section contains 1 minute, 5 minute, and 15 minute moving averages of increases to the four row counters, which have the same definitions as those in the completion report. The `totals` section shows the current totals. -``` +```json { "movingAverages": { "buildSegments": { "5m": { "processed": 3.392158326408501, + "processedBytes": 627.5492903856, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "15m": { "processed": 1.736165476881023, + "processedBytes": 321.1906130223, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 }, "1m": { "processed": 4.206417693750045, + "processedBytes": 778.1872733438, "unparseable": 0, "thrownAway": 0, "processedWithError": 0 @@ -215,6 +225,7 @@ An example report is shown below. The `movingAverages` section contains 1 minute "totals": { "buildSegments": { "processed": 1994, + "processedBytes": 3425110, "processedWithError": 0, "thrownAway": 0, "unparseable": 0 diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java index 461b066ce9d..3578fa21a6e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java @@ -58,6 +58,11 @@ public class ChannelCounters implements QueryCounter add(NO_PARTITION, 1, 0, 0, 0); } + public void incrementBytes(long bytes) + { + add(NO_PARTITION, 0, bytes, 0, 0); + } + public void incrementFileCount() { add(NO_PARTITION, 0, 0, 0, 1); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index 2c51f74c7c8..fe4e0b5a62b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; import org.apache.druid.data.input.impl.TimestampSpec; @@ -52,6 +53,7 @@ import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.segment.RowAdapters; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; import org.apache.druid.timeline.SegmentId; import java.io.File; @@ -140,6 +142,7 @@ public class ExternalInputSliceReader implements InputSliceReader final InputSourceReader reader; final boolean incrementCounters = isFileBasedInputSource(inputSource); + final InputStats inputStats = new SimpleRowIngestionMeters(); if (incrementCounters) { reader = new CountableInputSourceReader( inputSource.reader(schema, inputFormat, temporaryDirectory), @@ -159,7 +162,7 @@ public class ExternalInputSliceReader implements InputSliceReader public CloseableIterator make() { try { - CloseableIterator baseIterator = reader.read(); + CloseableIterator baseIterator = reader.read(inputStats); return new CloseableIterator() { private InputRow next = null; @@ -216,6 +219,7 @@ public class ExternalInputSliceReader implements InputSliceReader // has one file. if (incrementCounters) { channelCounters.incrementFileCount(); + channelCounters.incrementBytes(inputStats.getProcessedBytes()); } } catch (IOException e) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index cf4e4052d35..1790bb1137e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault; import org.apache.druid.msq.indexing.error.RowTooLargeFault; +import org.apache.druid.msq.test.CounterSnapshotBuilder; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -106,6 +107,12 @@ public class MSQInsertTest extends MSQTestBase 0 ))) .setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L})) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 49bb7f6d090..1cd8d66bbcb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -22,6 +22,7 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.msq.test.CounterSnapshotBuilder; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.segment.column.ColumnType; @@ -138,6 +139,12 @@ public class MSQReplaceTest extends MSQTestBase new Object[]{1466992800000L, 6L} ) ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } @@ -176,6 +183,12 @@ public class MSQReplaceTest extends MSQTestBase new Object[]{1466989200000L, "Kolega2357"} ) ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index b6305e13542..04d3a6a30c2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -33,6 +33,7 @@ import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault; import org.apache.druid.msq.shuffle.DurableStorageUtils; +import org.apache.druid.msq.test.CounterSnapshotBuilder; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; import org.apache.druid.query.InlineDataSource; @@ -794,7 +795,14 @@ public class MSQSelectTest extends MSQTestBase ) )) .tuningConfig(MSQTuningConfig.defaultConfig()) - .build()) + .build() + ) + .setExpectedCountersForStageWorkerChannel( + CounterSnapshotBuilder + .with().rows(20).bytes(toRead.length()).files(1).totalFiles(1) + .buildChannelCounter(), + 0, 0, "input0" + ) .verifyResults(); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java new file mode 100644 index 00000000000..0d89c0734ff --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CounterSnapshotBuilder.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.test; + +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.counters.QueryCounterSnapshot; + +/** + * Utility class to build instances of {@link QueryCounterSnapshot} used in tests. + */ +public class CounterSnapshotBuilder +{ + private long[] rows; + private long[] bytes; + private long[] frames; + private long[] files; + private long[] totalFiles; + + public static CounterSnapshotBuilder with() + { + return new CounterSnapshotBuilder(); + } + + public CounterSnapshotBuilder rows(long... rows) + { + this.rows = rows; + return this; + } + + public CounterSnapshotBuilder bytes(long... bytes) + { + this.bytes = bytes; + return this; + } + + public CounterSnapshotBuilder frames(long... frames) + { + this.frames = frames; + return this; + } + + public CounterSnapshotBuilder files(long... files) + { + this.files = files; + return this; + } + + public CounterSnapshotBuilder totalFiles(long... totalFiles) + { + this.totalFiles = totalFiles; + return this; + } + + public QueryCounterSnapshot buildChannelCounter() + { + return new ChannelCounters.Snapshot(rows, bytes, frames, files, totalFiles); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 43ae5e0b0c9..5dfe528fbaf 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -69,6 +69,9 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.input.InputSourceModule; +import org.apache.druid.msq.counters.CounterSnapshots; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.counters.QueryCounterSnapshot; import org.apache.druid.msq.exec.Controller; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.guice.MSQDurableStorageModule; @@ -168,7 +171,6 @@ import org.mockito.Mockito; import javax.annotation.Nonnull; import javax.annotation.Nullable; - import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -176,6 +178,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -593,14 +596,7 @@ public class MSQTestBase extends BaseCalciteQueryTest }; segmentManager.addSegment(segment); } - return new Supplier>() - { - @Override - public Pair get() - { - return new Pair<>(segmentManager.getSegment(segmentId), Closer.create()); - } - }; + return () -> new Pair<>(segmentManager.getSegment(segmentId), Closer.create()); } public SelectTester testSelectQuery() @@ -745,7 +741,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } } - public abstract class MSQTester> + public abstract class MSQTester> { protected String sql = null; protected Map queryContext = DEFAULT_MSQ_CONTEXT; @@ -758,78 +754,90 @@ public class MSQTestBase extends BaseCalciteQueryTest protected Matcher expectedExecutionErrorMatcher = null; protected MSQFault expectedMSQFault = null; protected Class expectedMSQFaultClass = null; + protected final Map>> + expectedStageWorkerChannelToCounters = new HashMap<>(); private boolean hasRun = false; - @SuppressWarnings("unchecked") public Builder setSql(String sql) { this.sql = sql; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setQueryContext(Map queryContext) { this.queryContext = queryContext; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedRowSignature(RowSignature expectedRowSignature) { Preconditions.checkArgument(!expectedRowSignature.equals(RowSignature.empty()), "Row signature cannot be empty"); this.expectedRowSignature = expectedRowSignature; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedSegment(Set expectedSegments) { Preconditions.checkArgument(!expectedSegments.isEmpty(), "Segments cannot be empty"); this.expectedSegments = expectedSegments; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedResultRows(List expectedResultRows) { Preconditions.checkArgument(expectedResultRows.size() > 0, "Results rows cannot be empty"); this.expectedResultRows = expectedResultRows; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec) { this.expectedMSQSpec = expectedMSQSpec; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedValidationErrorMatcher(Matcher expectedValidationErrorMatcher) { this.expectedValidationErrorMatcher = expectedValidationErrorMatcher; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedExecutionErrorMatcher(Matcher expectedExecutionErrorMatcher) { this.expectedExecutionErrorMatcher = expectedExecutionErrorMatcher; - return (Builder) this; + return asBuilder(); } - @SuppressWarnings("unchecked") public Builder setExpectedMSQFault(MSQFault MSQFault) { this.expectedMSQFault = MSQFault; - return (Builder) this; + return asBuilder(); } public Builder setExpectedMSQFaultClass(Class expectedMSQFaultClass) { this.expectedMSQFaultClass = expectedMSQFaultClass; + return asBuilder(); + } + + public Builder setExpectedCountersForStageWorkerChannel( + QueryCounterSnapshot counterSnapshot, + int stage, + int worker, + String channel + ) + { + this.expectedStageWorkerChannelToCounters.computeIfAbsent(stage, s -> new HashMap<>()) + .computeIfAbsent(worker, w -> new HashMap<>()) + .put(channel, counterSnapshot); + return asBuilder(); + } + + @SuppressWarnings("unchecked") + private Builder asBuilder() + { return (Builder) this; } @@ -847,6 +855,39 @@ public class MSQTestBase extends BaseCalciteQueryTest MatcherAssert.assertThat(e, expectedValidationErrorMatcher); } + protected void verifyCounters(CounterSnapshotsTree counterSnapshotsTree) + { + Assert.assertNotNull(counterSnapshotsTree); + + final Map> stageWorkerToSnapshots = counterSnapshotsTree.copyMap(); + expectedStageWorkerChannelToCounters.forEach((stage, expectedWorkerChannelToCounters) -> { + final Map workerToCounters = stageWorkerToSnapshots.get(stage); + Assert.assertNotNull("No counters for stage " + stage, workerToCounters); + + expectedWorkerChannelToCounters.forEach((worker, expectedChannelToCounters) -> { + CounterSnapshots counters = workerToCounters.get(worker); + Assert.assertNotNull( + StringUtils.format("No counters for stage [%d], worker [%d]", stage, worker), + counters + ); + + final Map channelToCounters = counters.getMap(); + expectedChannelToCounters.forEach( + (channel, counter) -> Assert.assertEquals( + StringUtils.format( + "Counter mismatch for stage [%d], worker [%d], channel [%s]", + stage, + worker, + channel + ), + counter, + channelToCounters.get(channel) + ) + ); + }); + }); + } + protected void readyToRun() { if (!hasRun) { @@ -943,7 +984,9 @@ public class MSQTestBase extends BaseCalciteQueryTest return; } - getPayloadOrThrow(controllerId); + MSQTaskReportPayload reportPayload = getPayloadOrThrow(controllerId); + verifyCounters(reportPayload.getCounters()); + MSQSpec foundSpec = indexingServiceClient.getQuerySpecForTask(controllerId); log.info( "found generated segments: %s", @@ -1112,6 +1155,7 @@ public class MSQTestBase extends BaseCalciteQueryTest } MSQTaskReportPayload payload = getPayloadOrThrow(controllerId); + verifyCounters(payload.getCounters()); if (payload.getStatus().getErrorReport() != null) { throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString()); @@ -1163,4 +1207,5 @@ public class MSQTestBase extends BaseCalciteQueryTest } } } + }