Emit metrics for distribution of number of rows per segment (#12730)

* initial commit of bucket dimensions for metrics

return counts of segments that have rowcount in a bucket size for a datasource
return average value of rowcount per segment in a datasource
added unit test
naming could use a lot of work
buckets right now are not finalized
added javadocs
altered metrics.md

* fix checkstyle issues

* addressed review comments

add monitor test
move added functionality to new monitor
update docs

* address comments

renamed monitor
handle tombstones better
update docs
added javadocs

* Add support for tombstones in the segment distribution

* undo changes to tombstone segmentizer factory

* fix accidental whitespacing changes

* address comments regarding metrics documentation

and rename variable to be more accurate

* fix tests

* fix checkstyle issues

* fix broken test

* undo removal of timeout
This commit is contained in:
TSFenwick 2022-07-12 07:04:42 -07:00 committed by GitHub
parent bb953be09b
commit 8c02880d5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 885 additions and 7 deletions

View File

@ -380,6 +380,7 @@ Metric monitoring is an essential part of Druid operations. The following monit
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|
|`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical processes. Available only on Historical processes.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical processes. Available only on Historical processes. Not to be used when lazy loading is configured. |
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|

View File

@ -323,6 +323,8 @@ decisions.
|`segment/usedPercent`|Percentage of space used by served segments.|dataSource, tier, priority.|< 100%|
|`segment/count`|Number of served segments.|dataSource, tier, priority.|Varies.|
|`segment/pendingDelete`|On-disk size in bytes of segments that are waiting to be cleared out|Varies.|
|`segment/rowCount/avg`| The average number of rows per segment on a historical. `SegmentStatsMonitor` must be enabled.| dataSource, tier, priority.|Varies. See [segment optimization](../operations/segment-optimization.md) for guidance on optimal segment sizes. |
|`segment/rowCount/range/count`| The number of segments in a bucket. `SegmentStatsMonitor` must be enabled.| dataSource, tier, priority, range.|Varies.|
### JVM

View File

@ -31,10 +31,12 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.metrics.SegmentRowCountDistribution;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -74,17 +76,31 @@ public class SegmentManager
private final ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> tablesLookup = new ConcurrentHashMap<>();
private long totalSegmentSize;
private long numSegments;
private long rowCount;
private final SegmentRowCountDistribution segmentRowCountDistribution = new SegmentRowCountDistribution();
private void addSegment(DataSegment segment)
private void addSegment(DataSegment segment, long numOfRows)
{
totalSegmentSize += segment.getSize();
numSegments++;
rowCount += (numOfRows);
if (segment.isTombstone()) {
segmentRowCountDistribution.addTombstoneToDistribution();
} else {
segmentRowCountDistribution.addRowCountToDistribution(numOfRows);
}
}
private void removeSegment(DataSegment segment)
private void removeSegment(DataSegment segment, long numOfRows)
{
totalSegmentSize -= segment.getSize();
numSegments--;
rowCount -= numOfRows;
if (segment.isTombstone()) {
segmentRowCountDistribution.removeTombstoneFromDistribution();
} else {
segmentRowCountDistribution.removeRowCountFromDistribution(numOfRows);
}
}
public VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimeline()
@ -97,6 +113,11 @@ public class SegmentManager
return tablesLookup;
}
public long getAverageRowCount()
{
return numSegments == 0 ? 0 : rowCount / numSegments;
}
public long getTotalSegmentSize()
{
return totalSegmentSize;
@ -111,8 +132,14 @@ public class SegmentManager
{
return numSegments == 0;
}
private SegmentRowCountDistribution getSegmentRowCountDistribution()
{
return segmentRowCountDistribution;
}
}
@Inject
public SegmentManager(
SegmentLoader segmentLoader
@ -138,6 +165,16 @@ public class SegmentManager
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize);
}
public Map<String, Long> getAverageRowCountForDatasource()
{
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getAverageRowCount);
}
public Map<String, SegmentRowCountDistribution> getRowCountDistribution()
{
return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getSegmentRowCountDistribution);
}
public Set<String> getDataSourceNames()
{
return dataSources.keySet();
@ -265,7 +302,9 @@ public class SegmentManager
segment.getVersion(),
segment.getShardSpec().createChunk(adapter)
);
dataSourceState.addSegment(segment);
StorageAdapter storageAdapter = adapter.asStorageAdapter();
long numOfRows = (segment.isTombstone() || storageAdapter == null) ? 0 : storageAdapter.getNumRows();
dataSourceState.addSegment(segment, numOfRows);
// Asyncly load segment index files into page cache in a thread pool
segmentLoader.loadSegmentIntoPageCache(segment, loadSegmentIntoPageCacheExec);
resultSupplier.set(true);
@ -321,9 +360,13 @@ public class SegmentManager
);
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
try (final Closer closer = Closer.create()) {
dataSourceState.removeSegment(segment);
StorageAdapter storageAdapter = oldQueryable.asStorageAdapter();
long numOfRows = (segment.isTombstone() || storageAdapter == null) ? 0 : storageAdapter.getNumRows();
dataSourceState.removeSegment(segment, numOfRows);
closer.register(oldQueryable);
log.info("Attempting to close segment %s", segment.getId());
final ReferenceCountingIndexedTable oldTable = dataSourceState.tablesLookup.remove(segment.getId());

View File

@ -45,6 +45,7 @@ import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.metrics.SegmentRowCountDistribution;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
@ -306,6 +307,16 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
}
public Map<String, Long> getAverageNumOfRowsPerSegmentForDatasource()
{
return segmentManager.getAverageRowCountForDatasource();
}
public Map<String, SegmentRowCountDistribution> getRowCountDistributionPerDatasource()
{
return segmentManager.getRowCountDistribution();
}
@Override
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import org.apache.druid.java.util.emitter.EmittingLogger;
import java.util.function.ObjIntConsumer;
/**
* Class that creates a count of segments that have row counts in certain buckets
*/
public class SegmentRowCountDistribution
{
private static final EmittingLogger log = new EmittingLogger(SegmentRowCountDistribution.class);
private final int[] buckets = new int[9];
private static final int TOMBSTONE_BUCKET_INDEX = 0;
/**
* Increments the count for a particular bucket held in this class
*
* @param rowCount the number of rows to figure out which bucket to increment
*/
public void addRowCountToDistribution(long rowCount)
{
int bucketIndex = determineBucketFromRowCount(rowCount);
buckets[bucketIndex]++;
}
/**
* Decrements the count for a particular bucket held in this class
*
* @param rowCount the count which determines which bucket to decrement
*/
public void removeRowCountFromDistribution(long rowCount)
{
int bucketIndex = determineBucketFromRowCount(rowCount);
buckets[bucketIndex]--;
if (buckets[bucketIndex] < 0) {
// can this ever go negative?
log.error("somehow got a count of less than 0, resetting value to 0");
buckets[bucketIndex] = 0;
}
}
/**
* Increments the count for number of tombstones in the distribution
*/
public void addTombstoneToDistribution()
{
buckets[TOMBSTONE_BUCKET_INDEX]++;
}
/**
* Decrements the count for the number of tombstones in he distribution.
*/
public void removeTombstoneFromDistribution()
{
buckets[TOMBSTONE_BUCKET_INDEX]--;
}
/**
* Determines the name of the dimension used for a bucket. Should never return NA as this isn't public and this
* method is private to this class
*
* @param index the index of the bucket
* @return the dimension which the bucket index refers to
*/
private static String getBucketDimensionFromIndex(int index)
{
switch (index) {
case 0:
return "Tombstone";
case 1:
return "0";
case 2:
return "1-10k";
case 3:
return "10k-2M";
case 4:
return "2M-4M";
case 5:
return "4M-6M";
case 6:
return "6M-8M";
case 7:
return "8M-10M";
case 8:
return "10M+";
// should never get to default
default:
return "NA";
}
}
/**
* Figures out which bucket the specified rowCount belongs to
*
* @param rowCount the number of rows in a segment
* @return the bucket index
*/
private static int determineBucketFromRowCount(long rowCount)
{
// 0 indexed bucket is reserved for tombstones
if (rowCount <= 0L) {
return 1;
}
if (rowCount <= 10_000L) {
return 2;
}
if (rowCount <= 2_000_000L) {
return 3;
}
if (rowCount <= 4_000_000L) {
return 4;
}
if (rowCount <= 6_000_000L) {
return 5;
}
if (rowCount <= 8_000_000L) {
return 6;
}
if (rowCount <= 10_000_000L) {
return 7;
}
return 8;
}
/**
* Gives the consumer the range dimension and the associated count. Will not give zero range unless there is a count there.
*
* @param consumer
*/
public void forEachDimension(final ObjIntConsumer<String> consumer)
{
for (int ii = 0; ii < buckets.length; ii++) {
// only report tombstones and 0 bucket if it has nonzero value
if (ii > 1 || buckets[ii] != 0) {
consumer.accept(getBucketDimensionFromIndex(ii), buckets[ii]);
}
}
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import com.google.inject.Inject;
import org.apache.druid.client.DruidServerConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import java.util.Map;
/**
* An experimental monitor used to keep track of segment stats. Can only be used on a historical and cannot be used with lazy loading.
*
* It keeps track of the average number of rows in a segment and the distribution of segments according to rowCount.
*/
public class SegmentStatsMonitor extends AbstractMonitor
{
private final DruidServerConfig serverConfig;
private final SegmentLoadDropHandler segmentLoadDropHandler;
private static final Logger log = new Logger(SegmentStatsMonitor.class);
/**
* Constructor for this monitor. Will throw IllegalStateException if lazy load on start is set to true.
*
* @param serverConfig
* @param segmentLoadDropHandler
* @param segmentLoaderConfig
*/
@Inject
public SegmentStatsMonitor(
DruidServerConfig serverConfig,
SegmentLoadDropHandler segmentLoadDropHandler,
SegmentLoaderConfig segmentLoaderConfig
)
{
if (segmentLoaderConfig.isLazyLoadOnStart()) {
// log message ensures there is an error displayed at startup if this fails as the exception isn't logged.
log.error("Monitor doesn't support working with lazy loading on start");
// throw this exception it kill the process at startup
throw new IllegalStateException("Monitor doesn't support working with lazy loading on start");
}
this.serverConfig = serverConfig;
this.segmentLoadDropHandler = segmentLoadDropHandler;
}
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
for (Map.Entry<String, Long> entry : segmentLoadDropHandler.getAverageNumOfRowsPerSegmentForDatasource().entrySet()) {
String dataSource = entry.getKey();
long averageSize = entry.getValue();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension("tier", serverConfig.getTier())
.setDimension("priority", String.valueOf(serverConfig.getPriority()));
emitter.emit(builder.build("segment/rowCount/avg", averageSize));
}
for (Map.Entry<String, SegmentRowCountDistribution> entry : segmentLoadDropHandler.getRowCountDistributionPerDatasource()
.entrySet()) {
String dataSource = entry.getKey();
SegmentRowCountDistribution rowCountBucket = entry.getValue();
rowCountBucket.forEachDimension((final String bucketDimension, final int count) -> {
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension("tier", serverConfig.getTier())
.setDimension("priority", String.valueOf(serverConfig.getPriority()));
builder.setDimension("range", bucketDimension);
ServiceEventBuilder<ServiceMetricEvent> output = builder.build("segment/rowCount/range/count", count);
emitter.emit(output);
});
}
return true;
}
}

View File

@ -19,15 +19,26 @@
package org.apache.druid.segment.loading;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.concurrent.ExecutorService;
/**
@ -61,7 +72,96 @@ public class CacheTestSegmentLoader implements SegmentLoader
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
return new StorageAdapter()
{
@Override
public Interval getInterval()
{
throw new UnsupportedOperationException();
}
@Override
public Indexed<String> getAvailableDimensions()
{
throw new UnsupportedOperationException();
}
@Override
public Iterable<String> getAvailableMetrics()
{
throw new UnsupportedOperationException();
}
@Override
public int getDimensionCardinality(String column)
{
throw new UnsupportedOperationException();
}
@Override
public DateTime getMinTime()
{
throw new UnsupportedOperationException();
}
@Override
public DateTime getMaxTime()
{
throw new UnsupportedOperationException();
}
@Nullable
@Override
public Comparable getMinValue(String column)
{
throw new UnsupportedOperationException();
}
@Nullable
@Override
public Comparable getMaxValue(String column)
{
throw new UnsupportedOperationException();
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
throw new UnsupportedOperationException();
}
@Override
public int getNumRows()
{
return 1;
}
@Override
public DateTime getMaxIngestedEventTime()
{
throw new UnsupportedOperationException();
}
@Override
public Metadata getMetadata()
{
throw new UnsupportedOperationException();
}
@Override
public Sequence<Cursor> makeCursors(
@Nullable Filter filter,
Interval interval,
VirtualColumns virtualColumns,
Granularity gran,
boolean descending,
@Nullable QueryMetrics<?> queryMetrics
)
{
throw new UnsupportedOperationException();
}
};
}
@Override

View File

@ -45,6 +45,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
@ -89,11 +90,14 @@ public class SegmentManagerTest
{
private final String version;
private final Interval interval;
private final StorageAdapter storageAdapter;
SegmentForTesting(String version, Interval interval)
{
this.version = version;
this.interval = interval;
storageAdapter = Mockito.mock(StorageAdapter.class);
Mockito.when(storageAdapter.getNumRows()).thenReturn(1);
}
public String getVersion()
@ -127,7 +131,7 @@ public class SegmentManagerTest
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
return storageAdapter;
}
@Override

View File

@ -56,6 +56,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.io.File;
@ -227,6 +228,8 @@ public class SegmentManagerThreadSafetyTest
{
return new Segment()
{
StorageAdapter storageAdapter = Mockito.mock(StorageAdapter.class);
@Override
public SegmentId getId()
{
@ -249,7 +252,8 @@ public class SegmentManagerThreadSafetyTest
@Override
public StorageAdapter asStorageAdapter()
{
throw new UnsupportedOperationException();
Mockito.when(storageAdapter.getNumRows()).thenReturn(1);
return storageAdapter;
}
@Override

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.function.ObjIntConsumer;
public class SegmentRowCountDistributionTest
{
private SegmentRowCountDistribution rowCountBucket;
@Before
public void setUp()
{
rowCountBucket = new SegmentRowCountDistribution();
}
@Test
public void test_bucketCountSanity()
{
// test base case
rowCountBucket.forEachDimension((final String dimension, final int count) -> {
Assert.assertEquals(0, count);
});
// tombstones
// add tombstones
rowCountBucket.addTombstoneToDistribution();
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0));
rowCountBucket.addTombstoneToDistribution();
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 0));
// remove tombstones
rowCountBucket.removeTombstoneFromDistribution();
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0));
rowCountBucket.removeTombstoneFromDistribution();
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 0));
// test bounds of 1st bucket
// with addition
rowCountBucket.addRowCountToDistribution(0);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1));
rowCountBucket.addRowCountToDistribution(0);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 1));
// with removal
rowCountBucket.removeRowCountFromDistribution(0);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1));
rowCountBucket.removeRowCountFromDistribution(0);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 1));
// test bounds of 2nd bucket
// with addition
rowCountBucket.addRowCountToDistribution(1);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2));
rowCountBucket.addRowCountToDistribution(10_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 2));
// with removal
rowCountBucket.removeRowCountFromDistribution(1);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2));
rowCountBucket.removeRowCountFromDistribution(10_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 2));
// test bounds of 3rd bucket
// with addition
rowCountBucket.addRowCountToDistribution(10_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3));
rowCountBucket.addRowCountToDistribution(2_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 3));
// with removal
rowCountBucket.removeRowCountFromDistribution(10_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3));
rowCountBucket.removeRowCountFromDistribution(2_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 3));
// test bounds of 4th bucket
// with addition
rowCountBucket.addRowCountToDistribution(2_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4));
rowCountBucket.addRowCountToDistribution(4_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 4));
// with removal
rowCountBucket.removeRowCountFromDistribution(2_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4));
rowCountBucket.removeRowCountFromDistribution(4_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 4));
// test bounds of 5th bucket
// with addition
rowCountBucket.addRowCountToDistribution(4_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5));
rowCountBucket.addRowCountToDistribution(6_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 5));
// with removal
rowCountBucket.removeRowCountFromDistribution(4_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5));
rowCountBucket.removeRowCountFromDistribution(6_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 5));
// test bounds of 6th bucket
// with addition
rowCountBucket.addRowCountToDistribution(6_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6));
rowCountBucket.addRowCountToDistribution(8_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 6));
// with removal
rowCountBucket.removeRowCountFromDistribution(6_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6));
rowCountBucket.removeRowCountFromDistribution(8_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 6));
// test bounds of 7th bucket
// with addition
rowCountBucket.addRowCountToDistribution(8_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7));
rowCountBucket.addRowCountToDistribution(10_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 7));
// with removal
rowCountBucket.removeRowCountFromDistribution(8_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7));
rowCountBucket.removeRowCountFromDistribution(10_000_000);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 7));
// test bounds of 8th bucket
// with addition
rowCountBucket.addRowCountToDistribution(10_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8));
rowCountBucket.addRowCountToDistribution(Long.MAX_VALUE);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 8));
// with removal
rowCountBucket.removeRowCountFromDistribution(10_000_001);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8));
rowCountBucket.removeRowCountFromDistribution(Long.MAX_VALUE);
rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 8));
}
// this is used to test part of the functionality in AssertBucketHasValue.assertExpected
@Test
public void test_bucketDimensionFromIndex()
{
Assert.assertEquals("Tombstone", getBucketDimensionFromIndex(0));
Assert.assertEquals("0", getBucketDimensionFromIndex(1));
Assert.assertEquals("1-10k", getBucketDimensionFromIndex(2));
Assert.assertEquals("10k-2M", getBucketDimensionFromIndex(3));
Assert.assertEquals("2M-4M", getBucketDimensionFromIndex(4));
Assert.assertEquals("4M-6M", getBucketDimensionFromIndex(5));
Assert.assertEquals("6M-8M", getBucketDimensionFromIndex(6));
Assert.assertEquals("8M-10M", getBucketDimensionFromIndex(7));
Assert.assertEquals("10M+", getBucketDimensionFromIndex(8));
Assert.assertEquals("NA", getBucketDimensionFromIndex(9));
}
private static class AssertBucketHasValue implements ObjIntConsumer<String>
{
private final int expectedBucket;
private final int expectedValue;
private AssertBucketHasValue(int expectedBucket, int expectedValue)
{
this.expectedBucket = expectedBucket;
this.expectedValue = expectedValue;
}
static AssertBucketHasValue assertExpected(int expectedValue, int expectedBucket)
{
return new AssertBucketHasValue(expectedBucket, expectedValue);
}
@Override
public void accept(String s, int value)
{
if (s.equals(getBucketDimensionFromIndex(expectedBucket))) {
Assert.assertEquals(expectedValue, value);
} else {
// assert all other values are empty
Assert.assertEquals(0, value);
}
}
}
// this is here because we didn't want to expose the internals of the buckets for segment rowCount distributions
private static String getBucketDimensionFromIndex(int index)
{
switch (index) {
case 0:
return "Tombstone";
case 1:
return "0";
case 2:
return "1-10k";
case 3:
return "10k-2M";
case 4:
return "2M-4M";
case 5:
return "4M-6M";
case 6:
return "6M-8M";
case 7:
return "8M-10M";
case 8:
return "10M+";
// should never get to default
default:
return "NA";
}
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DruidServerConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public class SegmentStatsMonitorTest
{
private static final String DATA_SOURCE = "dataSource";
private static final int PRIORITY = 111;
private static final String TIER = "tier";
private DruidServerConfig druidServerConfig;
private SegmentLoadDropHandler segmentLoadDropMgr;
private ServiceEmitter serviceEmitter;
private SegmentStatsMonitor monitor;
private final SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig();
@Before
public void setUp()
{
druidServerConfig = Mockito.mock(DruidServerConfig.class);
segmentLoadDropMgr = Mockito.mock(SegmentLoadDropHandler.class);
serviceEmitter = Mockito.mock(ServiceEmitter.class);
monitor = new SegmentStatsMonitor(
druidServerConfig,
segmentLoadDropMgr,
segmentLoaderConfig
);
Mockito.when(druidServerConfig.getTier()).thenReturn(TIER);
Mockito.when(druidServerConfig.getPriority()).thenReturn(PRIORITY);
}
@Test(expected = IllegalStateException.class)
public void testLazyLoadOnStartThrowsException()
{
SegmentLoaderConfig segmentLoaderConfig = Mockito.mock(SegmentLoaderConfig.class);
Mockito.when(segmentLoaderConfig.isLazyLoadOnStart()).thenReturn(true);
//should throw an exception here
new SegmentStatsMonitor(druidServerConfig, segmentLoadDropMgr, segmentLoaderConfig);
}
@Test
public void testSimple()
{
final SegmentRowCountDistribution segmentRowCountDistribution = new SegmentRowCountDistribution();
segmentRowCountDistribution.addRowCountToDistribution(100_000L);
Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource())
.thenReturn(ImmutableMap.of(DATA_SOURCE, 100_000L));
Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource())
.thenReturn(ImmutableMap.of(DATA_SOURCE, segmentRowCountDistribution));
ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> eventArgumentCaptor = ArgumentCaptor.forClass(
ServiceEventBuilder.class);
monitor.doMonitor(serviceEmitter);
Mockito.verify(serviceEmitter, Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture());
List<Map<String, Object>> eventsAsMaps = getEventMaps(eventArgumentCaptor.getAllValues());
Map<String, Map<String, Object>> actual = metricKeyedMap(eventsAsMaps);
List<ServiceEventBuilder<ServiceMetricEvent>> expectedEvents = new ArrayList<>();
expectedEvents.add(averageRowCountEvent(100_000L));
expectedEvents.add(rowCountRangeEvent("1-10k", 0));
expectedEvents.add(rowCountRangeEvent("10k-2M", 1));
expectedEvents.add(rowCountRangeEvent("2M-4M", 0));
expectedEvents.add(rowCountRangeEvent("4M-6M", 0));
expectedEvents.add(rowCountRangeEvent("6M-8M", 0));
expectedEvents.add(rowCountRangeEvent("8M-10M", 0));
expectedEvents.add(rowCountRangeEvent("10M+", 0));
List<Map<String, Object>> expectedEventsAsMap = getEventMaps(expectedEvents);
Map<String, Map<String, Object>> expected = metricKeyedMap(expectedEventsAsMap);
Assert.assertEquals("different number of metrics were returned", expected.size(), actual.size());
for (Map.Entry<String, Map<String, Object>> expectedKeyedEntry : expected.entrySet()) {
Map<String, Object> actualValue = actual.get(expectedKeyedEntry.getKey());
assertMetricMapsEqual(expectedKeyedEntry.getKey(), expectedKeyedEntry.getValue(), actualValue);
}
}
@Test
public void testZeroAndTombstoneDistribution()
{
final SegmentRowCountDistribution segmentRowCountDistribution = new SegmentRowCountDistribution();
segmentRowCountDistribution.addRowCountToDistribution(100_000L);
segmentRowCountDistribution.addRowCountToDistribution(0L);
segmentRowCountDistribution.addTombstoneToDistribution();
segmentRowCountDistribution.addTombstoneToDistribution();
Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource())
.thenReturn(ImmutableMap.of(DATA_SOURCE, 50_000L));
Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource())
.thenReturn(ImmutableMap.of(DATA_SOURCE, segmentRowCountDistribution));
ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> eventArgumentCaptor = ArgumentCaptor.forClass(
ServiceEventBuilder.class);
monitor.doMonitor(serviceEmitter);
Mockito.verify(serviceEmitter, Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture());
List<Map<String, Object>> eventsAsMaps = getEventMaps(eventArgumentCaptor.getAllValues());
Map<String, Map<String, Object>> actual = metricKeyedMap(eventsAsMaps);
List<ServiceEventBuilder<ServiceMetricEvent>> expectedEvents = new ArrayList<>();
expectedEvents.add(averageRowCountEvent(50_000L));
expectedEvents.add(rowCountRangeEvent("0", 1));
expectedEvents.add(rowCountRangeEvent("Tombstone", 2));
expectedEvents.add(rowCountRangeEvent("1-10k", 0));
expectedEvents.add(rowCountRangeEvent("10k-2M", 1));
expectedEvents.add(rowCountRangeEvent("2M-4M", 0));
expectedEvents.add(rowCountRangeEvent("4M-6M", 0));
expectedEvents.add(rowCountRangeEvent("6M-8M", 0));
expectedEvents.add(rowCountRangeEvent("8M-10M", 0));
expectedEvents.add(rowCountRangeEvent("10M+", 0));
List<Map<String, Object>> expectedEventsAsMap = getEventMaps(expectedEvents);
Map<String, Map<String, Object>> expected = metricKeyedMap(expectedEventsAsMap);
Assert.assertEquals("different number of metrics were returned", expected.size(), actual.size());
for (Map.Entry<String, Map<String, Object>> expectedKeyedEntry : expected.entrySet()) {
Map<String, Object> actualValue = actual.get(expectedKeyedEntry.getKey());
assertMetricMapsEqual(expectedKeyedEntry.getKey(), expectedKeyedEntry.getValue(), actualValue);
}
}
private void assertMetricMapsEqual(String messagePrefix, Map<String, Object> expected, Map<String, Object> actual)
{
Assert.assertEquals("different number of expected values for metrics", expected.size(), actual.size());
for (Map.Entry<String, Object> expectedMetricEntry : expected.entrySet()) {
Assert.assertEquals(
messagePrefix + " " + expectedMetricEntry.getKey(),
expectedMetricEntry.getValue(),
actual.get(expectedMetricEntry.getKey())
);
}
}
@Nonnull
private List<Map<String, Object>> getEventMaps(List<ServiceEventBuilder<ServiceMetricEvent>> eventBuilders)
{
return eventBuilders.stream()
.map(eventBuilder -> new HashMap<>(eventBuilder.build(ImmutableMap.of()).toMap()))
.peek(mappedValues -> mappedValues.remove("timestamp"))
.collect(Collectors.toList());
}
private Map<String, Map<String, Object>> metricKeyedMap(List<Map<String, Object>> eventsAsMaps)
{
return eventsAsMaps.stream()
.collect(
Collectors.toMap(eventasdf -> {
String metricName = eventasdf.get("metric").toString();
String range = eventasdf.getOrDefault("range", "").toString();
return metricName + range;
}, Function.identity())
);
}
private ServiceEventBuilder<ServiceMetricEvent> averageRowCountEvent(Number value)
{
return new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE)
.setDimension("tier", TIER)
.setDimension("priority", String.valueOf(PRIORITY))
.build("segment/rowCount/avg", value);
}
private ServiceEventBuilder<ServiceMetricEvent> rowCountRangeEvent(String range, Number value)
{
return new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE)
.setDimension("tier", TIER)
.setDimension("priority", String.valueOf(PRIORITY))
.setDimension("range", range)
.build("segment/rowCount/range/count", value);
}
}