Track input processedBytes with MSQ ingestion (#13559)

Follow up to #13520

Bytes processed are currently tracked for intermediate stages in MSQ ingestion.
This patch adds the capability to track the bytes processed by an MSQ controller
task while reading from an external input source or a segment source.

Changes:
- Track `processedBytes` for every `InputSource` read in `ExternalInputSliceReader`
- Update `ChannelCounters` with the above obtained `processedBytes` when incrementing
the input file count.
- Update task report structure in docs

The total input processed bytes can be obtained by summing the `processedBytes` as follows:

totalBytes = 0
for every root stage (i.e. a stage which does not have another stage as an input):
    for every worker in that stage:
        for every input channel: (i.e. channels with prefix "input", e.g. "input0", "input1", etc.)
            totalBytes += processedBytes
This commit is contained in:
Kashif Faraz 2022-12-16 02:20:01 +05:30 committed by GitHub
parent 431a1195ca
commit d6949b1b79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 200 additions and 32 deletions

View File

@ -67,12 +67,14 @@ An example output is shown below:
"rowStats": {
"determinePartitions": {
"processed": 0,
"processedBytes": 0,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 0
},
"buildSegments": {
"processed": 5390324,
"processedBytes": 5109573212,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 0
@ -118,18 +120,21 @@ An example output is shown below:
"buildSegments": {
"5m": {
"processed": 3.392158326408501,
"processedBytes": 627.5492903856,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
},
"15m": {
"processed": 1.736165476881023,
"processedBytes": 321.1906130223,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
},
"1m": {
"processed": 4.206417693750045,
"processedBytes": 778.1872733438,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
@ -139,6 +144,7 @@ An example output is shown below:
"totals": {
"buildSegments": {
"processed": 1994,
"processedBytes": 3425110,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 0
@ -168,6 +174,7 @@ Only batch tasks have the DETERMINE_PARTITIONS phase. Realtime tasks such as tho
the `rowStats` map contains information about row counts. There is one entry for each ingestion phase. The definitions of the different row counts are shown below:
- `processed`: Number of rows successfully ingested without parsing errors
- `processedBytes`: Total number of uncompressed bytes processed by the task. This reports the total byte size of all rows i.e. even those that are included in `processedWithError`, `unparseable` or `thrownAway`.
- `processedWithError`: Number of rows that were ingested, but contained a parsing error within one or more columns. This typically occurs where input rows have a parseable structure but invalid types for columns, such as passing in a non-numeric String value for a numeric column.
- `thrownAway`: Number of rows skipped. This includes rows with timestamps that were outside of the ingestion task's defined time interval and rows that were filtered out with a [`transformSpec`](./ingestion-spec.md#transformspec), but doesn't include the rows skipped by explicit user configurations. For example, the rows skipped by `skipHeaderRows` or `hasHeaderRow` in the CSV format are not counted.
- `unparseable`: Number of rows that could not be parsed at all and were discarded. This tracks input rows without a parseable structure, such as passing in non-JSON data when using a JSON parser.
@ -188,24 +195,27 @@ http://<middlemanager-host>:<worker-port>/druid/worker/v1/chat/<task-id>/rowStat
An example report is shown below. The `movingAverages` section contains 1 minute, 5 minute, and 15 minute moving averages of increases to the four row counters, which have the same definitions as those in the completion report. The `totals` section shows the current totals.
```
```json
{
"movingAverages": {
"buildSegments": {
"5m": {
"processed": 3.392158326408501,
"processedBytes": 627.5492903856,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
},
"15m": {
"processed": 1.736165476881023,
"processedBytes": 321.1906130223,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
},
"1m": {
"processed": 4.206417693750045,
"processedBytes": 778.1872733438,
"unparseable": 0,
"thrownAway": 0,
"processedWithError": 0
@ -215,6 +225,7 @@ An example report is shown below. The `movingAverages` section contains 1 minute
"totals": {
"buildSegments": {
"processed": 1994,
"processedBytes": 3425110,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 0

View File

@ -58,6 +58,11 @@ public class ChannelCounters implements QueryCounter
add(NO_PARTITION, 1, 0, 0, 0);
}
public void incrementBytes(long bytes)
{
add(NO_PARTITION, 0, bytes, 0, 0);
}
public void incrementFileCount()
{
add(NO_PARTITION, 0, 0, 0, 1);

View File

@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputStats;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
@ -52,6 +53,7 @@ import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.segment.RowAdapters;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.timeline.SegmentId;
import java.io.File;
@ -140,6 +142,7 @@ public class ExternalInputSliceReader implements InputSliceReader
final InputSourceReader reader;
final boolean incrementCounters = isFileBasedInputSource(inputSource);
final InputStats inputStats = new SimpleRowIngestionMeters();
if (incrementCounters) {
reader = new CountableInputSourceReader(
inputSource.reader(schema, inputFormat, temporaryDirectory),
@ -159,7 +162,7 @@ public class ExternalInputSliceReader implements InputSliceReader
public CloseableIterator<InputRow> make()
{
try {
CloseableIterator<InputRow> baseIterator = reader.read();
CloseableIterator<InputRow> baseIterator = reader.read(inputStats);
return new CloseableIterator<InputRow>()
{
private InputRow next = null;
@ -216,6 +219,7 @@ public class ExternalInputSliceReader implements InputSliceReader
// has one file.
if (incrementCounters) {
channelCounters.incrementFileCount();
channelCounters.incrementBytes(inputStats.getProcessedBytes());
}
}
catch (IOException e) {

View File

@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.test.CounterSnapshotBuilder;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
@ -106,6 +107,12 @@ public class MSQInsertTest extends MSQTestBase
0
)))
.setExpectedResultRows(ImmutableList.of(new Object[]{1466985600000L, 20L}))
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
0, 0, "input0"
)
.verifyResults();
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.test.CounterSnapshotBuilder;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
@ -138,6 +139,12 @@ public class MSQReplaceTest extends MSQTestBase
new Object[]{1466992800000L, 6L}
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
0, 0, "input0"
)
.verifyResults();
}
@ -176,6 +183,12 @@ public class MSQReplaceTest extends MSQTestBase
new Object[]{1466989200000L, "Kolega2357"}
)
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
0, 0, "input0"
)
.verifyResults();
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.test.CounterSnapshotBuilder;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.query.InlineDataSource;
@ -794,7 +795,14 @@ public class MSQSelectTest extends MSQTestBase
)
))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.build()
)
.setExpectedCountersForStageWorkerChannel(
CounterSnapshotBuilder
.with().rows(20).bytes(toRead.length()).files(1).totalFiles(1)
.buildChannelCounter(),
0, 0, "input0"
)
.verifyResults();
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.test;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
/**
* Utility class to build instances of {@link QueryCounterSnapshot} used in tests.
*/
public class CounterSnapshotBuilder
{
private long[] rows;
private long[] bytes;
private long[] frames;
private long[] files;
private long[] totalFiles;
public static CounterSnapshotBuilder with()
{
return new CounterSnapshotBuilder();
}
public CounterSnapshotBuilder rows(long... rows)
{
this.rows = rows;
return this;
}
public CounterSnapshotBuilder bytes(long... bytes)
{
this.bytes = bytes;
return this;
}
public CounterSnapshotBuilder frames(long... frames)
{
this.frames = frames;
return this;
}
public CounterSnapshotBuilder files(long... files)
{
this.files = files;
return this;
}
public CounterSnapshotBuilder totalFiles(long... totalFiles)
{
this.totalFiles = totalFiles;
return this;
}
public QueryCounterSnapshot buildChannelCounter()
{
return new ChannelCounters.Snapshot(rows, bytes, frames, files, totalFiles);
}
}

View File

@ -69,6 +69,9 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.guice.MSQDurableStorageModule;
@ -168,7 +171,6 @@ import org.mockito.Mockito;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@ -176,6 +178,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -593,14 +596,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
};
segmentManager.addSegment(segment);
}
return new Supplier<Pair<Segment, Closeable>>()
{
@Override
public Pair<Segment, Closeable> get()
{
return new Pair<>(segmentManager.getSegment(segmentId), Closer.create());
}
};
return () -> new Pair<>(segmentManager.getSegment(segmentId), Closer.create());
}
public SelectTester testSelectQuery()
@ -745,7 +741,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
}
public abstract class MSQTester<Builder extends MSQTester<?>>
public abstract class MSQTester<Builder extends MSQTester<Builder>>
{
protected String sql = null;
protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
@ -758,78 +754,90 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected Matcher<Throwable> expectedExecutionErrorMatcher = null;
protected MSQFault expectedMSQFault = null;
protected Class<? extends MSQFault> expectedMSQFaultClass = null;
protected final Map<Integer, Map<Integer, Map<String, QueryCounterSnapshot>>>
expectedStageWorkerChannelToCounters = new HashMap<>();
private boolean hasRun = false;
@SuppressWarnings("unchecked")
public Builder setSql(String sql)
{
this.sql = sql;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setQueryContext(Map<String, Object> queryContext)
{
this.queryContext = queryContext;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setExpectedRowSignature(RowSignature expectedRowSignature)
{
Preconditions.checkArgument(!expectedRowSignature.equals(RowSignature.empty()), "Row signature cannot be empty");
this.expectedRowSignature = expectedRowSignature;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setExpectedSegment(Set<SegmentId> expectedSegments)
{
Preconditions.checkArgument(!expectedSegments.isEmpty(), "Segments cannot be empty");
this.expectedSegments = expectedSegments;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setExpectedResultRows(List<Object[]> expectedResultRows)
{
Preconditions.checkArgument(expectedResultRows.size() > 0, "Results rows cannot be empty");
this.expectedResultRows = expectedResultRows;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setExpectedMSQSpec(MSQSpec expectedMSQSpec)
{
this.expectedMSQSpec = expectedMSQSpec;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setExpectedValidationErrorMatcher(Matcher<Throwable> expectedValidationErrorMatcher)
{
this.expectedValidationErrorMatcher = expectedValidationErrorMatcher;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setExpectedExecutionErrorMatcher(Matcher<Throwable> expectedExecutionErrorMatcher)
{
this.expectedExecutionErrorMatcher = expectedExecutionErrorMatcher;
return (Builder) this;
return asBuilder();
}
@SuppressWarnings("unchecked")
public Builder setExpectedMSQFault(MSQFault MSQFault)
{
this.expectedMSQFault = MSQFault;
return (Builder) this;
return asBuilder();
}
public Builder setExpectedMSQFaultClass(Class<? extends MSQFault> expectedMSQFaultClass)
{
this.expectedMSQFaultClass = expectedMSQFaultClass;
return asBuilder();
}
public Builder setExpectedCountersForStageWorkerChannel(
QueryCounterSnapshot counterSnapshot,
int stage,
int worker,
String channel
)
{
this.expectedStageWorkerChannelToCounters.computeIfAbsent(stage, s -> new HashMap<>())
.computeIfAbsent(worker, w -> new HashMap<>())
.put(channel, counterSnapshot);
return asBuilder();
}
@SuppressWarnings("unchecked")
private Builder asBuilder()
{
return (Builder) this;
}
@ -847,6 +855,39 @@ public class MSQTestBase extends BaseCalciteQueryTest
MatcherAssert.assertThat(e, expectedValidationErrorMatcher);
}
protected void verifyCounters(CounterSnapshotsTree counterSnapshotsTree)
{
Assert.assertNotNull(counterSnapshotsTree);
final Map<Integer, Map<Integer, CounterSnapshots>> stageWorkerToSnapshots = counterSnapshotsTree.copyMap();
expectedStageWorkerChannelToCounters.forEach((stage, expectedWorkerChannelToCounters) -> {
final Map<Integer, CounterSnapshots> workerToCounters = stageWorkerToSnapshots.get(stage);
Assert.assertNotNull("No counters for stage " + stage, workerToCounters);
expectedWorkerChannelToCounters.forEach((worker, expectedChannelToCounters) -> {
CounterSnapshots counters = workerToCounters.get(worker);
Assert.assertNotNull(
StringUtils.format("No counters for stage [%d], worker [%d]", stage, worker),
counters
);
final Map<String, QueryCounterSnapshot> channelToCounters = counters.getMap();
expectedChannelToCounters.forEach(
(channel, counter) -> Assert.assertEquals(
StringUtils.format(
"Counter mismatch for stage [%d], worker [%d], channel [%s]",
stage,
worker,
channel
),
counter,
channelToCounters.get(channel)
)
);
});
});
}
protected void readyToRun()
{
if (!hasRun) {
@ -943,7 +984,9 @@ public class MSQTestBase extends BaseCalciteQueryTest
return;
}
getPayloadOrThrow(controllerId);
MSQTaskReportPayload reportPayload = getPayloadOrThrow(controllerId);
verifyCounters(reportPayload.getCounters());
MSQSpec foundSpec = indexingServiceClient.getQuerySpecForTask(controllerId);
log.info(
"found generated segments: %s",
@ -1112,6 +1155,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
MSQTaskReportPayload payload = getPayloadOrThrow(controllerId);
verifyCounters(payload.getCounters());
if (payload.getStatus().getErrorReport() != null) {
throw new ISE("Query %s failed due to %s", sql, payload.getStatus().getErrorReport().toString());
@ -1163,4 +1207,5 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
}
}
}