Add segment generator counters to MSQ reports (#13909)

* Add segment generator counters to reports

* Remove unneeded annotation

* Fix checkstyle and coverage

* Add persist and merged as new metrics

* Address review comments

* Fix checkstyle

* Create metrics class to handle updating counters

* Address review comments

* Add rowsPushed as a new metrics
This commit is contained in:
Adarsh Sanjeev 2023-03-22 21:47:26 +05:30 committed by GitHub
parent f4392a3155
commit 7bab407495
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 429 additions and 13 deletions

View File

@ -253,7 +253,9 @@ The response shows an example report for a query.
"status": {
"status": "SUCCESS",
"startTime": "2022-09-14T22:12:09.266Z",
"durationMs": 28227
"durationMs": 28227,
"pendingTasks": 0,
"runningTasks": 2
},
"stages": [
{
@ -528,6 +530,12 @@ The response shows an example report for a query.
"frames": [
73
]
},
"segmentGenerationProgress": {
"type": "segmentGenerationProgress",
"rowsProcessed": 465346,
"rowsPersisted": 465346,
"rowsMerged": 465346
}
}
}

View File

@ -35,6 +35,7 @@ public class CounterNames
private static final String OUTPUT = "output";
private static final String SHUFFLE = "shuffle";
private static final String SORT_PROGRESS = "sortProgress";
private static final String SEGMENT_GENERATION_PROGRESS = "segmentGenerationProgress";
private static final String WARNINGS = "warnings";
private static final Comparator<String> COMPARATOR = new NameComparator();
@ -83,6 +84,14 @@ public class CounterNames
return WARNINGS;
}
/**
* Standard name for segment generation counter created by {@link CounterTracker#segmentGenerationProgress()}.
*/
public static String getSegmentGenerationProgress()
{
return SEGMENT_GENERATION_PROGRESS;
}
/**
* Standard comparator for counter names. Not necessary for functionality, but helps with human-readability.
*/

View File

@ -47,6 +47,11 @@ public class CounterTracker
return counter(CounterNames.sortProgress(), SuperSorterProgressTrackerCounter::new).tracker();
}
public SegmentGenerationProgressCounter segmentGenerationProgress()
{
return counter(CounterNames.getSegmentGenerationProgress(), SegmentGenerationProgressCounter::new);
}
public WarningCounters warnings()
{
return counter(CounterNames.warnings(), WarningCounters::new);

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.counters;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import javax.annotation.Nullable;
import java.util.Objects;
/**
* Counters for segment generation phase. Created by {@link CounterTracker#segmentGenerationProgress()}.
*/
public class SegmentGenerationProgressCounter implements QueryCounter
{
// Number of rows processed by the segment generator as input, but not yet persisted.
@GuardedBy("this")
private long rowsProcessed = 0L;
// Number of rows persisted by the segment generator as a queryable index.
@GuardedBy("this")
private long rowsPersisted = 0L;
// Number of rows that have been merged into a single file from the queryable indexes, prior to the push to deep storage.
@GuardedBy("this")
private long rowsMerged = 0L;
// Number of rows in segments that have been pushed to deep storage.
@GuardedBy("this")
private long rowsPushed = 0L;
public void incrementRowsProcessed(long rowsProcessed)
{
synchronized (this) {
this.rowsProcessed += rowsProcessed;
}
}
public void incrementRowsPersisted(long rowsPersisted)
{
synchronized (this) {
this.rowsPersisted += rowsPersisted;
}
}
public void incrementRowsMerged(long rowsMerged)
{
synchronized (this) {
this.rowsMerged += rowsMerged;
}
}
public void incrementRowsPushed(long rowsPushed)
{
synchronized (this) {
this.rowsPushed += rowsPushed;
}
}
@Override
@Nullable
public QueryCounterSnapshot snapshot()
{
synchronized (this) {
return new Snapshot(rowsProcessed, rowsPersisted, rowsMerged, rowsPushed);
}
}
@JsonTypeName("segmentGenerationProgress")
public static class Snapshot implements QueryCounterSnapshot
{
private final long rowsProcessed;
private final long rowsPersisted;
private final long rowsMerged;
private final long rowsPushed;
@JsonCreator
public Snapshot(
@JsonProperty("rowsProcessed") final long rowsProcessed,
@JsonProperty("rowsPersisted") final long rowsPersisted,
@JsonProperty("rowsMerged") final long rowsMerged,
@JsonProperty("rowsPushed") final long rowsPushed
)
{
this.rowsProcessed = rowsProcessed;
this.rowsPersisted = rowsPersisted;
this.rowsMerged = rowsMerged;
this.rowsPushed = rowsPushed;
}
@JsonProperty(value = "rowsProcessed")
public long getRowsProcessed()
{
return rowsProcessed;
}
@JsonProperty(value = "rowsPersisted")
public long getRowsPersisted()
{
return rowsPersisted;
}
@JsonProperty(value = "rowsMerged")
public long getRowsMerged()
{
return rowsMerged;
}
@JsonProperty(value = "rowsPushed")
public long getRowsPushed()
{
return rowsPushed;
}
@Override
public String toString()
{
return "Snapshot{" +
"rowsProcessed=" + rowsProcessed +
", rowsPersisted=" + rowsPersisted +
", rowsMerged=" + rowsMerged +
", rowsPushed=" + rowsPushed +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Snapshot snapshot = (Snapshot) o;
return rowsProcessed == snapshot.rowsProcessed
&& rowsPersisted == snapshot.rowsPersisted
&& rowsMerged == snapshot.rowsMerged
&& rowsPushed == snapshot.rowsPushed;
}
@Override
public int hashCode()
{
return Objects.hash(rowsProcessed, rowsPersisted, rowsMerged, rowsPushed);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.counters;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
/**
* Wrapper around {@link FireDepartmentMetrics} which updates the progress counters while updating its metrics. This
* is necessary as the {@link org.apache.druid.segment.realtime.appenderator.BatchAppenderator} used by the
* {@link org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessor} is not part of the MSQ extension, and hence,
* cannot update the counters used in MSQ reports as it persists and pushes segments to deep storage.
*/
public class SegmentGeneratorMetricsWrapper extends FireDepartmentMetrics
{
private final SegmentGenerationProgressCounter segmentGenerationProgressCounter;
public SegmentGeneratorMetricsWrapper(SegmentGenerationProgressCounter segmentGenerationProgressCounter)
{
this.segmentGenerationProgressCounter = segmentGenerationProgressCounter;
}
@Override
public void incrementRowOutputCount(long numRows)
{
super.incrementRowOutputCount(numRows);
segmentGenerationProgressCounter.incrementRowsPersisted(numRows);
}
@Override
public void incrementMergedRows(long rows)
{
super.incrementMergedRows(rows);
segmentGenerationProgressCounter.incrementRowsMerged(rows);
}
@Override
public void incrementPushedRows(long rows)
{
super.incrementPushedRows(rows);
segmentGenerationProgressCounter.incrementRowsPushed(rows);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterSnapshotsSerializer;
import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.MSQControllerTask;
@ -167,6 +168,7 @@ public class MSQIndexingModule implements DruidModule
ChannelCounters.Snapshot.class,
SuperSorterProgressTrackerCounter.Snapshot.class,
WarningCounters.Snapshot.class,
SegmentGenerationProgressCounter.Snapshot.class,
// InputSpec classes
ExternalInputSpec.class,

View File

@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.util.SequenceUtils;
@ -74,7 +75,7 @@ public class SegmentGeneratorFrameProcessor implements FrameProcessor<DataSegmen
private final SegmentIdWithShardSpec segmentIdWithShardSpec;
private final List<String> dimensionsForInputRows;
private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
private final SegmentGenerationProgressCounter segmentGenerationProgressCounter;
private boolean firstRun = true;
private long rowsWritten = 0L;
@ -83,7 +84,8 @@ public class SegmentGeneratorFrameProcessor implements FrameProcessor<DataSegmen
final ColumnMappings columnMappings,
final List<String> dimensionsForInputRows,
final Appenderator appenderator,
final SegmentIdWithShardSpec segmentIdWithShardSpec
final SegmentIdWithShardSpec segmentIdWithShardSpec,
final SegmentGenerationProgressCounter segmentGenerationProgressCounter
)
{
this.inChannel = readableInput.getChannel();
@ -91,6 +93,7 @@ public class SegmentGeneratorFrameProcessor implements FrameProcessor<DataSegmen
this.appenderator = appenderator;
this.segmentIdWithShardSpec = segmentIdWithShardSpec;
this.dimensionsForInputRows = dimensionsForInputRows;
this.segmentGenerationProgressCounter = segmentGenerationProgressCounter;
outputColumnNameToFrameColumnNumberMap = new Object2IntOpenHashMap<>();
outputColumnNameToFrameColumnNumberMap.defaultReturnValue(-1);
@ -203,6 +206,7 @@ public class SegmentGeneratorFrameProcessor implements FrameProcessor<DataSegmen
try {
rowsWritten++;
appenderator.add(segmentIdWithShardSpec, inputRow, null);
segmentGenerationProgressCounter.incrementRowsProcessed(1);
}
catch (Exception e) {
throw new RuntimeException(e);

View File

@ -35,6 +35,8 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
import org.apache.druid.msq.counters.SegmentGeneratorMetricsWrapper;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
@ -53,7 +55,6 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.appenderator.Appenderators;
@ -147,6 +148,8 @@ public class SegmentGeneratorFrameProcessorFactory
}
}
));
final SegmentGenerationProgressCounter segmentGenerationProgressCounter = counters.segmentGenerationProgress();
final SegmentGeneratorMetricsWrapper segmentGeneratorMetricsWrapper = new SegmentGeneratorMetricsWrapper(segmentGenerationProgressCounter);
final Sequence<SegmentGeneratorFrameProcessor> workers = inputSequence.map(
readableInputPair -> {
@ -170,7 +173,7 @@ public class SegmentGeneratorFrameProcessorFactory
persistDirectory,
frameContext.memoryParameters()
),
new FireDepartmentMetrics(), // We should eventually expose the FireDepartmentMetrics
segmentGeneratorMetricsWrapper,
frameContext.segmentPusher(),
frameContext.jsonMapper(),
frameContext.indexIO(),
@ -185,7 +188,8 @@ public class SegmentGeneratorFrameProcessorFactory
columnMappings,
dataSchema.getDimensionsSpec().getDimensionNames(),
appenderator,
segmentIdWithShardSpec
segmentIdWithShardSpec,
segmentGenerationProgressCounter
);
}
);

View File

@ -183,7 +183,7 @@ public class MSQTaskQueryMaker implements QueryMaker
final List<ColumnMapping> columnMappings = new ArrayList<>();
for (final Pair<Integer, String> entry : fieldMapping) {
// Note: SQL generally allows output columns to be duplicates, but MultiStageQueryMakerFactory.validateNoDuplicateAliases
// Note: SQL generally allows output columns to be duplicates, but MSQTaskSqlEngine.validateNoDuplicateAliases
// will prevent duplicate output columns from appearing here. So no need to worry about it.
final String queryColumn = druidQuery.getOutputRowSignature().getColumnName(entry.getKey());

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.counters;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.Test;
public class SegmentGenerationProgressCounterTest
{
@Test
public void testEquals()
{
EqualsVerifier.forClass(SegmentGenerationProgressCounter.Snapshot.class)
.usingGetClass()
.withNonnullFields("rowsProcessed", "rowsPersisted", "rowsMerged", "rowsPushed")
.verify();
}
}

View File

@ -128,6 +128,11 @@ public class MSQInsertTest extends MSQTestBase
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(Arrays.stream(expectedArray).sum()),
2, 0
)
.verifyResults();
}
@ -188,6 +193,11 @@ public class MSQInsertTest extends MSQTestBase
.with().rows(1).frames(1),
2, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(1),
2, 0
)
.verifyResults();
}
@ -362,6 +372,11 @@ public class MSQInsertTest extends MSQTestBase
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(Arrays.stream(expectedArray).sum()),
2, 0
)
.verifyResults();
}
@ -539,6 +554,11 @@ public class MSQInsertTest extends MSQTestBase
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(Arrays.stream(expectedArray).sum()),
2, 0
)
.verifyResults();
}
@ -591,6 +611,11 @@ public class MSQInsertTest extends MSQTestBase
.with().rows(expectedArray).frames(expectedArray),
2, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(Arrays.stream(expectedArray).sum()),
2, 0
)
.verifyResults();
}
@ -703,6 +728,11 @@ public class MSQInsertTest extends MSQTestBase
.with().rows(1).frames(1),
2, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(1),
2, 0
)
.verifyResults();
}
@ -773,6 +803,11 @@ public class MSQInsertTest extends MSQTestBase
.with().rows(6).frames(1),
2, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(6),
2, 0
)
.verifyResults();
}

View File

@ -126,6 +126,11 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(1, 1, 1, 1, 1, 1).frames(1, 1, 1, 1, 1, 1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(6),
1, 0
)
.verifyResults();
}
@ -170,6 +175,11 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(1).frames(1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(1),
1, 0
)
.verifyResults();
}
@ -306,6 +316,11 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(4).frames(1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(4),
1, 0
)
.verifyResults();
}
@ -374,6 +389,11 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(6).frames(1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(6),
1, 0
)
.verifyResults();
}
@ -430,6 +450,11 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(3, 3).frames(1, 1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(6),
1, 0
)
.verifyResults();
}
@ -484,6 +509,11 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(2).frames(1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(2),
1, 0
)
.verifyResults();
}
@ -533,6 +563,11 @@ public class MSQReplaceTest extends MSQTestBase
.with().rows(2).frames(1),
1, 0, "input0"
)
.setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher
.with().segmentRowsProcessed(2),
1, 0
)
.verifyResults();
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.msq.test;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
import org.junit.Assert;
/**
@ -33,6 +34,7 @@ public class CounterSnapshotMatcher
private long[] frames;
private long[] files;
private long[] totalFiles;
private Long segmentRowsProcessed;
public static CounterSnapshotMatcher with()
{
@ -44,6 +46,11 @@ public class CounterSnapshotMatcher
this.rows = rows;
return this;
}
public CounterSnapshotMatcher segmentRowsProcessed(long segmentRowsProcessed)
{
this.segmentRowsProcessed = segmentRowsProcessed;
return this;
}
public CounterSnapshotMatcher bytes(long... bytes)
{
@ -75,21 +82,23 @@ public class CounterSnapshotMatcher
*/
public void matchQuerySnapshot(String errorMessageFormat, QueryCounterSnapshot queryCounterSnapshot)
{
ChannelCounters.Snapshot channelCountersSnapshot = (ChannelCounters.Snapshot) queryCounterSnapshot;
if (rows != null) {
Assert.assertArrayEquals(errorMessageFormat, rows, channelCountersSnapshot.getRows());
Assert.assertArrayEquals(errorMessageFormat, rows, ((ChannelCounters.Snapshot) queryCounterSnapshot).getRows());
}
if (bytes != null) {
Assert.assertArrayEquals(errorMessageFormat, bytes, channelCountersSnapshot.getBytes());
Assert.assertArrayEquals(errorMessageFormat, bytes, ((ChannelCounters.Snapshot) queryCounterSnapshot).getBytes());
}
if (frames != null) {
Assert.assertArrayEquals(errorMessageFormat, frames, channelCountersSnapshot.getFrames());
Assert.assertArrayEquals(errorMessageFormat, frames, ((ChannelCounters.Snapshot) queryCounterSnapshot).getFrames());
}
if (files != null) {
Assert.assertArrayEquals(errorMessageFormat, files, channelCountersSnapshot.getFiles());
Assert.assertArrayEquals(errorMessageFormat, files, ((ChannelCounters.Snapshot) queryCounterSnapshot).getFiles());
}
if (totalFiles != null) {
Assert.assertArrayEquals(errorMessageFormat, totalFiles, channelCountersSnapshot.getTotalFiles());
Assert.assertArrayEquals(errorMessageFormat, totalFiles, ((ChannelCounters.Snapshot) queryCounterSnapshot).getTotalFiles());
}
if (segmentRowsProcessed != null) {
Assert.assertEquals(errorMessageFormat, segmentRowsProcessed.longValue(), ((SegmentGenerationProgressCounter.Snapshot) queryCounterSnapshot).getRowsProcessed());
}
}
}

View File

@ -70,6 +70,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.input.InputSourceModule;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.counters.QueryCounterSnapshot;
@ -897,6 +898,18 @@ public class MSQTestBase extends BaseCalciteQueryTest
return asBuilder();
}
public Builder setExpectedSegmentGenerationProgressCountersForStageWorker(
CounterSnapshotMatcher counterSnapshot,
int stage,
int worker
)
{
this.expectedStageWorkerChannelToCounters.computeIfAbsent(stage, s -> new HashMap<>())
.computeIfAbsent(worker, w -> new HashMap<>())
.put(CounterNames.getSegmentGenerationProgress(), counterSnapshot);
return asBuilder();
}
@SuppressWarnings("unchecked")
private Builder asBuilder()
{

View File

@ -43,6 +43,10 @@ public class FireDepartmentMetrics
private final AtomicLong persistBackPressureMillis = new AtomicLong(0);
private final AtomicLong failedPersists = new AtomicLong(0);
private final AtomicLong failedHandoffs = new AtomicLong(0);
// Measures the number of rows that have been merged. Segments are merged into a single file before they are pushed to deep storage.
private final AtomicLong mergedRows = new AtomicLong(0);
// Measures the number of rows that have been pushed to deep storage.
private final AtomicLong pushedRows = new AtomicLong(0);
private final AtomicLong mergeTimeMillis = new AtomicLong(0);
private final AtomicLong mergeCpuTime = new AtomicLong(0);
private final AtomicLong persistCpuTime = new AtomicLong(0);
@ -114,6 +118,16 @@ public class FireDepartmentMetrics
mergeTimeMillis.addAndGet(millis);
}
public void incrementMergedRows(long rows)
{
mergedRows.addAndGet(rows);
}
public void incrementPushedRows(long rows)
{
pushedRows.addAndGet(rows);
}
public void incrementMergeCpuTime(long mergeTime)
{
mergeCpuTime.addAndGet(mergeTime);
@ -210,6 +224,15 @@ public class FireDepartmentMetrics
return failedHandoffs.get();
}
public long mergedRows()
{
return mergedRows.get();
}
public long pushedRows()
{
return pushedRows.get();
}
public long mergeTimeMillis()
{
return mergeTimeMillis.get();
@ -266,6 +289,8 @@ public class FireDepartmentMetrics
retVal.sinkCount.set(sinkCount.get());
retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
retVal.mergedRows.set(mergedRows.get());
retVal.pushedRows.set(pushedRows.get());
long messageGapSnapshot = 0;
final long maxTimestamp = retVal.messageMaxTimestamp.get();

View File

@ -780,11 +780,15 @@ public class BatchAppenderator implements Appenderator
final long mergeFinishTime;
final long startTime = System.nanoTime();
List<QueryableIndex> indexes = new ArrayList<>();
long rowsinMergedSegment = 0L;
Closer closer = Closer.create();
try {
for (FireHydrant fireHydrant : sink) {
Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
if (queryableIndex != null) {
rowsinMergedSegment += queryableIndex.getNumRows();
}
log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
indexes.add(queryableIndex);
closer.register(segmentAndCloseable.rhs);
@ -804,6 +808,7 @@ public class BatchAppenderator implements Appenderator
);
mergeFinishTime = System.nanoTime();
metrics.incrementMergedRows(rowsinMergedSegment);
log.debug("Segment[%s] built in %,dms.", identifier, (mergeFinishTime - startTime) / 1000000);
}
@ -841,6 +846,7 @@ public class BatchAppenderator implements Appenderator
removeDirectory(computePersistDir(identifier));
final long pushFinishTime = System.nanoTime();
metrics.incrementPushedRows(rowsinMergedSegment);
log.info(
"Segment[%s] of %,d bytes "