From 8c02880d5f3e48d3e8189a240eae391eb6107862 Mon Sep 17 00:00:00 2001 From: TSFenwick Date: Tue, 12 Jul 2022 07:04:42 -0700 Subject: [PATCH] Emit metrics for distribution of number of rows per segment (#12730) * initial commit of bucket dimensions for metrics return counts of segments that have rowcount in a bucket size for a datasource return average value of rowcount per segment in a datasource added unit test naming could use a lot of work buckets right now are not finalized added javadocs altered metrics.md * fix checkstyle issues * addressed review comments add monitor test move added functionality to new monitor update docs * address comments renamed monitor handle tombstones better update docs added javadocs * Add support for tombstones in the segment distribution * undo changes to tombstone segmentizer factory * fix accidental whitespacing changes * address comments regarding metrics documentation and rename variable to be more accurate * fix tests * fix checkstyle issues * fix broken test * undo removal of timeout --- docs/configuration/index.md | 1 + docs/operations/metrics.md | 2 + .../apache/druid/server/SegmentManager.java | 51 +++- .../coordination/SegmentLoadDropHandler.java | 11 + .../metrics/SegmentRowCountDistribution.java | 161 ++++++++++++ .../server/metrics/SegmentStatsMonitor.java | 105 ++++++++ .../loading/CacheTestSegmentLoader.java | 102 +++++++- .../druid/server/SegmentManagerTest.java | 6 +- .../SegmentManagerThreadSafetyTest.java | 6 +- .../SegmentRowCountDistributionTest.java | 233 ++++++++++++++++++ .../metrics/SegmentStatsMonitorTest.java | 214 ++++++++++++++++ 11 files changed, 885 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java create mode 100644 server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java create mode 100644 server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java create mode 100644 server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a38f79ebf44..32cedc5e7fe 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -380,6 +380,7 @@ Metric monitoring is an essential part of Druid operations. The following monit |`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.| |`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.| |`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical processes. Available only on Historical processes.| +|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical processes. Available only on Historical processes. Not to be used when lazy loading is configured. | |`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.| |`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: https://github.com/apache/druid/pull/4973.| |`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.| diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 59fbb9f9081..e427532cc71 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -323,6 +323,8 @@ decisions. |`segment/usedPercent`|Percentage of space used by served segments.|dataSource, tier, priority.|< 100%| |`segment/count`|Number of served segments.|dataSource, tier, priority.|Varies.| |`segment/pendingDelete`|On-disk size in bytes of segments that are waiting to be cleared out|Varies.| +|`segment/rowCount/avg`| The average number of rows per segment on a historical. `SegmentStatsMonitor` must be enabled.| dataSource, tier, priority.|Varies. See [segment optimization](../operations/segment-optimization.md) for guidance on optimal segment sizes. | +|`segment/rowCount/range/count`| The number of segments in a bucket. `SegmentStatsMonitor` must be enabled.| dataSource, tier, priority, range.|Varies.| ### JVM diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 329fc16248b..ff509272a81 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -31,10 +31,12 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; +import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.server.metrics.SegmentRowCountDistribution; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -74,17 +76,31 @@ public class SegmentManager private final ConcurrentHashMap tablesLookup = new ConcurrentHashMap<>(); private long totalSegmentSize; private long numSegments; + private long rowCount; + private final SegmentRowCountDistribution segmentRowCountDistribution = new SegmentRowCountDistribution(); - private void addSegment(DataSegment segment) + private void addSegment(DataSegment segment, long numOfRows) { totalSegmentSize += segment.getSize(); numSegments++; + rowCount += (numOfRows); + if (segment.isTombstone()) { + segmentRowCountDistribution.addTombstoneToDistribution(); + } else { + segmentRowCountDistribution.addRowCountToDistribution(numOfRows); + } } - private void removeSegment(DataSegment segment) + private void removeSegment(DataSegment segment, long numOfRows) { totalSegmentSize -= segment.getSize(); numSegments--; + rowCount -= numOfRows; + if (segment.isTombstone()) { + segmentRowCountDistribution.removeTombstoneFromDistribution(); + } else { + segmentRowCountDistribution.removeRowCountFromDistribution(numOfRows); + } } public VersionedIntervalTimeline getTimeline() @@ -97,6 +113,11 @@ public class SegmentManager return tablesLookup; } + public long getAverageRowCount() + { + return numSegments == 0 ? 0 : rowCount / numSegments; + } + public long getTotalSegmentSize() { return totalSegmentSize; @@ -111,8 +132,14 @@ public class SegmentManager { return numSegments == 0; } + + private SegmentRowCountDistribution getSegmentRowCountDistribution() + { + return segmentRowCountDistribution; + } } + @Inject public SegmentManager( SegmentLoader segmentLoader @@ -138,6 +165,16 @@ public class SegmentManager return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize); } + public Map getAverageRowCountForDatasource() + { + return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getAverageRowCount); + } + + public Map getRowCountDistribution() + { + return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getSegmentRowCountDistribution); + } + public Set getDataSourceNames() { return dataSources.keySet(); @@ -265,7 +302,9 @@ public class SegmentManager segment.getVersion(), segment.getShardSpec().createChunk(adapter) ); - dataSourceState.addSegment(segment); + StorageAdapter storageAdapter = adapter.asStorageAdapter(); + long numOfRows = (segment.isTombstone() || storageAdapter == null) ? 0 : storageAdapter.getNumRows(); + dataSourceState.addSegment(segment, numOfRows); // Asyncly load segment index files into page cache in a thread pool segmentLoader.loadSegmentIntoPageCache(segment, loadSegmentIntoPageCacheExec); resultSupplier.set(true); @@ -321,9 +360,13 @@ public class SegmentManager ); final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject(); + if (oldQueryable != null) { try (final Closer closer = Closer.create()) { - dataSourceState.removeSegment(segment); + StorageAdapter storageAdapter = oldQueryable.asStorageAdapter(); + long numOfRows = (segment.isTombstone() || storageAdapter == null) ? 0 : storageAdapter.getNumRows(); + dataSourceState.removeSegment(segment, numOfRows); + closer.register(oldQueryable); log.info("Attempting to close segment %s", segment.getId()); final ReferenceCountingIndexedTable oldTable = dataSourceState.tablesLookup.remove(segment.getId()); diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 5d4bf76018b..ba918d8fbd1 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -45,6 +45,7 @@ import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.metrics.SegmentRowCountDistribution; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -306,6 +307,16 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler } } + public Map getAverageNumOfRowsPerSegmentForDatasource() + { + return segmentManager.getAverageRowCountForDatasource(); + } + + public Map getRowCountDistributionPerDatasource() + { + return segmentManager.getRowCountDistribution(); + } + @Override public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { diff --git a/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java b/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java new file mode 100644 index 00000000000..337df6384d6 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SegmentRowCountDistribution.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.function.ObjIntConsumer; + +/** + * Class that creates a count of segments that have row counts in certain buckets + */ +public class SegmentRowCountDistribution +{ + private static final EmittingLogger log = new EmittingLogger(SegmentRowCountDistribution.class); + + private final int[] buckets = new int[9]; + private static final int TOMBSTONE_BUCKET_INDEX = 0; + + /** + * Increments the count for a particular bucket held in this class + * + * @param rowCount the number of rows to figure out which bucket to increment + */ + public void addRowCountToDistribution(long rowCount) + { + int bucketIndex = determineBucketFromRowCount(rowCount); + buckets[bucketIndex]++; + } + + /** + * Decrements the count for a particular bucket held in this class + * + * @param rowCount the count which determines which bucket to decrement + */ + public void removeRowCountFromDistribution(long rowCount) + { + int bucketIndex = determineBucketFromRowCount(rowCount); + buckets[bucketIndex]--; + if (buckets[bucketIndex] < 0) { + // can this ever go negative? + log.error("somehow got a count of less than 0, resetting value to 0"); + buckets[bucketIndex] = 0; + } + } + + /** + * Increments the count for number of tombstones in the distribution + */ + public void addTombstoneToDistribution() + { + buckets[TOMBSTONE_BUCKET_INDEX]++; + } + + /** + * Decrements the count for the number of tombstones in he distribution. + */ + public void removeTombstoneFromDistribution() + { + buckets[TOMBSTONE_BUCKET_INDEX]--; + } + + /** + * Determines the name of the dimension used for a bucket. Should never return NA as this isn't public and this + * method is private to this class + * + * @param index the index of the bucket + * @return the dimension which the bucket index refers to + */ + private static String getBucketDimensionFromIndex(int index) + { + switch (index) { + case 0: + return "Tombstone"; + case 1: + return "0"; + case 2: + return "1-10k"; + case 3: + return "10k-2M"; + case 4: + return "2M-4M"; + case 5: + return "4M-6M"; + case 6: + return "6M-8M"; + case 7: + return "8M-10M"; + case 8: + return "10M+"; + // should never get to default + default: + return "NA"; + } + } + + /** + * Figures out which bucket the specified rowCount belongs to + * + * @param rowCount the number of rows in a segment + * @return the bucket index + */ + private static int determineBucketFromRowCount(long rowCount) + { + // 0 indexed bucket is reserved for tombstones + if (rowCount <= 0L) { + return 1; + } + if (rowCount <= 10_000L) { + return 2; + } + if (rowCount <= 2_000_000L) { + return 3; + } + if (rowCount <= 4_000_000L) { + return 4; + } + if (rowCount <= 6_000_000L) { + return 5; + } + if (rowCount <= 8_000_000L) { + return 6; + } + if (rowCount <= 10_000_000L) { + return 7; + } + return 8; + } + + /** + * Gives the consumer the range dimension and the associated count. Will not give zero range unless there is a count there. + * + * @param consumer + */ + public void forEachDimension(final ObjIntConsumer consumer) + { + for (int ii = 0; ii < buckets.length; ii++) { + // only report tombstones and 0 bucket if it has nonzero value + if (ii > 1 || buckets[ii] != 0) { + consumer.accept(getBucketDimensionFromIndex(ii), buckets[ii]); + } + } + } + +} diff --git a/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java new file mode 100644 index 00000000000..c72aba29575 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/metrics/SegmentStatsMonitor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.server.metrics; + + +import com.google.inject.Inject; +import org.apache.druid.client.DruidServerConfig; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.java.util.metrics.AbstractMonitor; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.server.coordination.SegmentLoadDropHandler; + +import java.util.Map; + +/** + * An experimental monitor used to keep track of segment stats. Can only be used on a historical and cannot be used with lazy loading. + * + * It keeps track of the average number of rows in a segment and the distribution of segments according to rowCount. + */ +public class SegmentStatsMonitor extends AbstractMonitor +{ + private final DruidServerConfig serverConfig; + private final SegmentLoadDropHandler segmentLoadDropHandler; + + private static final Logger log = new Logger(SegmentStatsMonitor.class); + + /** + * Constructor for this monitor. Will throw IllegalStateException if lazy load on start is set to true. + * + * @param serverConfig + * @param segmentLoadDropHandler + * @param segmentLoaderConfig + */ + @Inject + public SegmentStatsMonitor( + DruidServerConfig serverConfig, + SegmentLoadDropHandler segmentLoadDropHandler, + SegmentLoaderConfig segmentLoaderConfig + ) + { + if (segmentLoaderConfig.isLazyLoadOnStart()) { + // log message ensures there is an error displayed at startup if this fails as the exception isn't logged. + log.error("Monitor doesn't support working with lazy loading on start"); + // throw this exception it kill the process at startup + throw new IllegalStateException("Monitor doesn't support working with lazy loading on start"); + } + this.serverConfig = serverConfig; + this.segmentLoadDropHandler = segmentLoadDropHandler; + + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + for (Map.Entry entry : segmentLoadDropHandler.getAverageNumOfRowsPerSegmentForDatasource().entrySet()) { + String dataSource = entry.getKey(); + long averageSize = entry.getValue(); + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension("tier", serverConfig.getTier()) + .setDimension("priority", String.valueOf(serverConfig.getPriority())); + emitter.emit(builder.build("segment/rowCount/avg", averageSize)); + } + + for (Map.Entry entry : segmentLoadDropHandler.getRowCountDistributionPerDatasource() + .entrySet()) { + String dataSource = entry.getKey(); + SegmentRowCountDistribution rowCountBucket = entry.getValue(); + + rowCountBucket.forEachDimension((final String bucketDimension, final int count) -> { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension("tier", serverConfig.getTier()) + .setDimension("priority", String.valueOf(serverConfig.getPriority())); + builder.setDimension("range", bucketDimension); + ServiceEventBuilder output = builder.build("segment/rowCount/range/count", count); + emitter.emit(output); + }); + } + + return true; + } +} diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index 941b459c73b..f80688276c3 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -19,15 +19,26 @@ package org.apache.druid.segment.loading; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.util.concurrent.ExecutorService; /** @@ -61,7 +72,96 @@ public class CacheTestSegmentLoader implements SegmentLoader @Override public StorageAdapter asStorageAdapter() { - throw new UnsupportedOperationException(); + return new StorageAdapter() + { + @Override + public Interval getInterval() + { + throw new UnsupportedOperationException(); + } + + @Override + public Indexed getAvailableDimensions() + { + throw new UnsupportedOperationException(); + } + + @Override + public Iterable getAvailableMetrics() + { + throw new UnsupportedOperationException(); + } + + @Override + public int getDimensionCardinality(String column) + { + throw new UnsupportedOperationException(); + } + + @Override + public DateTime getMinTime() + { + throw new UnsupportedOperationException(); + } + + @Override + public DateTime getMaxTime() + { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public Comparable getMinValue(String column) + { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public Comparable getMaxValue(String column) + { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumRows() + { + return 1; + } + + @Override + public DateTime getMaxIngestedEventTime() + { + throw new UnsupportedOperationException(); + } + + @Override + public Metadata getMetadata() + { + throw new UnsupportedOperationException(); + } + + @Override + public Sequence makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics queryMetrics + ) + { + throw new UnsupportedOperationException(); + } + }; } @Override diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index a64078b4d7e..9d1ac4c4e2b 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -45,6 +45,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.HashMap; @@ -89,11 +90,14 @@ public class SegmentManagerTest { private final String version; private final Interval interval; + private final StorageAdapter storageAdapter; SegmentForTesting(String version, Interval interval) { this.version = version; this.interval = interval; + storageAdapter = Mockito.mock(StorageAdapter.class); + Mockito.when(storageAdapter.getNumRows()).thenReturn(1); } public String getVersion() @@ -127,7 +131,7 @@ public class SegmentManagerTest @Override public StorageAdapter asStorageAdapter() { - throw new UnsupportedOperationException(); + return storageAdapter; } @Override diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java index b1de3a22ee6..27abe091615 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -56,6 +56,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import javax.annotation.Nullable; import java.io.File; @@ -227,6 +228,8 @@ public class SegmentManagerThreadSafetyTest { return new Segment() { + StorageAdapter storageAdapter = Mockito.mock(StorageAdapter.class); + @Override public SegmentId getId() { @@ -249,7 +252,8 @@ public class SegmentManagerThreadSafetyTest @Override public StorageAdapter asStorageAdapter() { - throw new UnsupportedOperationException(); + Mockito.when(storageAdapter.getNumRows()).thenReturn(1); + return storageAdapter; } @Override diff --git a/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java b/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java new file mode 100644 index 00000000000..b5ebfe0174d --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/metrics/SegmentRowCountDistributionTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.metrics; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.function.ObjIntConsumer; + + +public class SegmentRowCountDistributionTest +{ + + private SegmentRowCountDistribution rowCountBucket; + + @Before + public void setUp() + { + rowCountBucket = new SegmentRowCountDistribution(); + } + + @Test + public void test_bucketCountSanity() + { + // test base case + rowCountBucket.forEachDimension((final String dimension, final int count) -> { + Assert.assertEquals(0, count); + }); + + // tombstones + // add tombstones + rowCountBucket.addTombstoneToDistribution(); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0)); + rowCountBucket.addTombstoneToDistribution(); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 0)); + // remove tombstones + rowCountBucket.removeTombstoneFromDistribution(); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 0)); + rowCountBucket.removeTombstoneFromDistribution(); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 0)); + + // test bounds of 1st bucket + // with addition + rowCountBucket.addRowCountToDistribution(0); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1)); + rowCountBucket.addRowCountToDistribution(0); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 1)); + // with removal + rowCountBucket.removeRowCountFromDistribution(0); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 1)); + rowCountBucket.removeRowCountFromDistribution(0); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 1)); + + // test bounds of 2nd bucket + // with addition + rowCountBucket.addRowCountToDistribution(1); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2)); + rowCountBucket.addRowCountToDistribution(10_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 2)); + // with removal + rowCountBucket.removeRowCountFromDistribution(1); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 2)); + rowCountBucket.removeRowCountFromDistribution(10_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 2)); + + // test bounds of 3rd bucket + // with addition + rowCountBucket.addRowCountToDistribution(10_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3)); + rowCountBucket.addRowCountToDistribution(2_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 3)); + // with removal + rowCountBucket.removeRowCountFromDistribution(10_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 3)); + rowCountBucket.removeRowCountFromDistribution(2_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 3)); + + // test bounds of 4th bucket + // with addition + rowCountBucket.addRowCountToDistribution(2_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4)); + rowCountBucket.addRowCountToDistribution(4_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 4)); + // with removal + rowCountBucket.removeRowCountFromDistribution(2_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 4)); + + rowCountBucket.removeRowCountFromDistribution(4_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 4)); + + + // test bounds of 5th bucket + // with addition + rowCountBucket.addRowCountToDistribution(4_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5)); + rowCountBucket.addRowCountToDistribution(6_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 5)); + // with removal + rowCountBucket.removeRowCountFromDistribution(4_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 5)); + rowCountBucket.removeRowCountFromDistribution(6_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 5)); + + // test bounds of 6th bucket + // with addition + rowCountBucket.addRowCountToDistribution(6_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6)); + rowCountBucket.addRowCountToDistribution(8_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 6)); + // with removal + rowCountBucket.removeRowCountFromDistribution(6_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 6)); + rowCountBucket.removeRowCountFromDistribution(8_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 6)); + + // test bounds of 7th bucket + // with addition + rowCountBucket.addRowCountToDistribution(8_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7)); + rowCountBucket.addRowCountToDistribution(10_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 7)); + // with removal + rowCountBucket.removeRowCountFromDistribution(8_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 7)); + rowCountBucket.removeRowCountFromDistribution(10_000_000); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 7)); + + // test bounds of 8th bucket + // with addition + rowCountBucket.addRowCountToDistribution(10_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8)); + rowCountBucket.addRowCountToDistribution(Long.MAX_VALUE); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(2, 8)); + // with removal + rowCountBucket.removeRowCountFromDistribution(10_000_001); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(1, 8)); + rowCountBucket.removeRowCountFromDistribution(Long.MAX_VALUE); + rowCountBucket.forEachDimension(AssertBucketHasValue.assertExpected(0, 8)); + } + + // this is used to test part of the functionality in AssertBucketHasValue.assertExpected + @Test + public void test_bucketDimensionFromIndex() + { + Assert.assertEquals("Tombstone", getBucketDimensionFromIndex(0)); + Assert.assertEquals("0", getBucketDimensionFromIndex(1)); + Assert.assertEquals("1-10k", getBucketDimensionFromIndex(2)); + Assert.assertEquals("10k-2M", getBucketDimensionFromIndex(3)); + Assert.assertEquals("2M-4M", getBucketDimensionFromIndex(4)); + Assert.assertEquals("4M-6M", getBucketDimensionFromIndex(5)); + Assert.assertEquals("6M-8M", getBucketDimensionFromIndex(6)); + Assert.assertEquals("8M-10M", getBucketDimensionFromIndex(7)); + Assert.assertEquals("10M+", getBucketDimensionFromIndex(8)); + Assert.assertEquals("NA", getBucketDimensionFromIndex(9)); + } + + private static class AssertBucketHasValue implements ObjIntConsumer + { + + private final int expectedBucket; + private final int expectedValue; + + private AssertBucketHasValue(int expectedBucket, int expectedValue) + { + this.expectedBucket = expectedBucket; + this.expectedValue = expectedValue; + } + + static AssertBucketHasValue assertExpected(int expectedValue, int expectedBucket) + { + return new AssertBucketHasValue(expectedBucket, expectedValue); + } + + @Override + public void accept(String s, int value) + { + if (s.equals(getBucketDimensionFromIndex(expectedBucket))) { + Assert.assertEquals(expectedValue, value); + } else { + // assert all other values are empty + Assert.assertEquals(0, value); + } + } + } + + // this is here because we didn't want to expose the internals of the buckets for segment rowCount distributions + private static String getBucketDimensionFromIndex(int index) + { + switch (index) { + case 0: + return "Tombstone"; + case 1: + return "0"; + case 2: + return "1-10k"; + case 3: + return "10k-2M"; + case 4: + return "2M-4M"; + case 5: + return "4M-6M"; + case 6: + return "6M-8M"; + case 7: + return "8M-10M"; + case 8: + return "10M+"; + // should never get to default + default: + return "NA"; + } + } + + +} diff --git a/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java b/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java new file mode 100644 index 00000000000..cb08a39fc36 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/metrics/SegmentStatsMonitorTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.server.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.DruidServerConfig; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.server.coordination.SegmentLoadDropHandler; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class SegmentStatsMonitorTest +{ + private static final String DATA_SOURCE = "dataSource"; + private static final int PRIORITY = 111; + private static final String TIER = "tier"; + + private DruidServerConfig druidServerConfig; + private SegmentLoadDropHandler segmentLoadDropMgr; + private ServiceEmitter serviceEmitter; + private SegmentStatsMonitor monitor; + private final SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig(); + + @Before + public void setUp() + { + druidServerConfig = Mockito.mock(DruidServerConfig.class); + segmentLoadDropMgr = Mockito.mock(SegmentLoadDropHandler.class); + serviceEmitter = Mockito.mock(ServiceEmitter.class); + monitor = new SegmentStatsMonitor( + druidServerConfig, + segmentLoadDropMgr, + segmentLoaderConfig + ); + Mockito.when(druidServerConfig.getTier()).thenReturn(TIER); + Mockito.when(druidServerConfig.getPriority()).thenReturn(PRIORITY); + } + + @Test(expected = IllegalStateException.class) + public void testLazyLoadOnStartThrowsException() + { + SegmentLoaderConfig segmentLoaderConfig = Mockito.mock(SegmentLoaderConfig.class); + Mockito.when(segmentLoaderConfig.isLazyLoadOnStart()).thenReturn(true); + + //should throw an exception here + new SegmentStatsMonitor(druidServerConfig, segmentLoadDropMgr, segmentLoaderConfig); + } + + @Test + public void testSimple() + { + final SegmentRowCountDistribution segmentRowCountDistribution = new SegmentRowCountDistribution(); + segmentRowCountDistribution.addRowCountToDistribution(100_000L); + + Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource()) + .thenReturn(ImmutableMap.of(DATA_SOURCE, 100_000L)); + Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource()) + .thenReturn(ImmutableMap.of(DATA_SOURCE, segmentRowCountDistribution)); + + ArgumentCaptor> eventArgumentCaptor = ArgumentCaptor.forClass( + ServiceEventBuilder.class); + monitor.doMonitor(serviceEmitter); + Mockito.verify(serviceEmitter, Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture()); + + List> eventsAsMaps = getEventMaps(eventArgumentCaptor.getAllValues()); + Map> actual = metricKeyedMap(eventsAsMaps); + + List> expectedEvents = new ArrayList<>(); + expectedEvents.add(averageRowCountEvent(100_000L)); + expectedEvents.add(rowCountRangeEvent("1-10k", 0)); + expectedEvents.add(rowCountRangeEvent("10k-2M", 1)); + expectedEvents.add(rowCountRangeEvent("2M-4M", 0)); + expectedEvents.add(rowCountRangeEvent("4M-6M", 0)); + expectedEvents.add(rowCountRangeEvent("6M-8M", 0)); + expectedEvents.add(rowCountRangeEvent("8M-10M", 0)); + expectedEvents.add(rowCountRangeEvent("10M+", 0)); + + List> expectedEventsAsMap = getEventMaps(expectedEvents); + Map> expected = metricKeyedMap(expectedEventsAsMap); + + Assert.assertEquals("different number of metrics were returned", expected.size(), actual.size()); + for (Map.Entry> expectedKeyedEntry : expected.entrySet()) { + Map actualValue = actual.get(expectedKeyedEntry.getKey()); + assertMetricMapsEqual(expectedKeyedEntry.getKey(), expectedKeyedEntry.getValue(), actualValue); + } + } + + @Test + public void testZeroAndTombstoneDistribution() + { + final SegmentRowCountDistribution segmentRowCountDistribution = new SegmentRowCountDistribution(); + segmentRowCountDistribution.addRowCountToDistribution(100_000L); + segmentRowCountDistribution.addRowCountToDistribution(0L); + segmentRowCountDistribution.addTombstoneToDistribution(); + segmentRowCountDistribution.addTombstoneToDistribution(); + + Mockito.when(segmentLoadDropMgr.getAverageNumOfRowsPerSegmentForDatasource()) + .thenReturn(ImmutableMap.of(DATA_SOURCE, 50_000L)); + Mockito.when(segmentLoadDropMgr.getRowCountDistributionPerDatasource()) + .thenReturn(ImmutableMap.of(DATA_SOURCE, segmentRowCountDistribution)); + + ArgumentCaptor> eventArgumentCaptor = ArgumentCaptor.forClass( + ServiceEventBuilder.class); + monitor.doMonitor(serviceEmitter); + Mockito.verify(serviceEmitter, Mockito.atLeastOnce()).emit(eventArgumentCaptor.capture()); + + List> eventsAsMaps = getEventMaps(eventArgumentCaptor.getAllValues()); + Map> actual = metricKeyedMap(eventsAsMaps); + + List> expectedEvents = new ArrayList<>(); + expectedEvents.add(averageRowCountEvent(50_000L)); + expectedEvents.add(rowCountRangeEvent("0", 1)); + expectedEvents.add(rowCountRangeEvent("Tombstone", 2)); + expectedEvents.add(rowCountRangeEvent("1-10k", 0)); + expectedEvents.add(rowCountRangeEvent("10k-2M", 1)); + expectedEvents.add(rowCountRangeEvent("2M-4M", 0)); + expectedEvents.add(rowCountRangeEvent("4M-6M", 0)); + expectedEvents.add(rowCountRangeEvent("6M-8M", 0)); + expectedEvents.add(rowCountRangeEvent("8M-10M", 0)); + expectedEvents.add(rowCountRangeEvent("10M+", 0)); + + List> expectedEventsAsMap = getEventMaps(expectedEvents); + Map> expected = metricKeyedMap(expectedEventsAsMap); + + Assert.assertEquals("different number of metrics were returned", expected.size(), actual.size()); + for (Map.Entry> expectedKeyedEntry : expected.entrySet()) { + Map actualValue = actual.get(expectedKeyedEntry.getKey()); + assertMetricMapsEqual(expectedKeyedEntry.getKey(), expectedKeyedEntry.getValue(), actualValue); + } + } + + private void assertMetricMapsEqual(String messagePrefix, Map expected, Map actual) + { + Assert.assertEquals("different number of expected values for metrics", expected.size(), actual.size()); + for (Map.Entry expectedMetricEntry : expected.entrySet()) { + Assert.assertEquals( + messagePrefix + " " + expectedMetricEntry.getKey(), + expectedMetricEntry.getValue(), + actual.get(expectedMetricEntry.getKey()) + ); + } + } + + @Nonnull + private List> getEventMaps(List> eventBuilders) + { + return eventBuilders.stream() + .map(eventBuilder -> new HashMap<>(eventBuilder.build(ImmutableMap.of()).toMap())) + .peek(mappedValues -> mappedValues.remove("timestamp")) + .collect(Collectors.toList()); + } + + private Map> metricKeyedMap(List> eventsAsMaps) + { + return eventsAsMaps.stream() + .collect( + Collectors.toMap(eventasdf -> { + String metricName = eventasdf.get("metric").toString(); + String range = eventasdf.getOrDefault("range", "").toString(); + return metricName + range; + }, Function.identity()) + ); + } + + private ServiceEventBuilder averageRowCountEvent(Number value) + { + return new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE) + .setDimension("tier", TIER) + .setDimension("priority", String.valueOf(PRIORITY)) + .build("segment/rowCount/avg", value); + } + + private ServiceEventBuilder rowCountRangeEvent(String range, Number value) + { + return new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, DATA_SOURCE) + .setDimension("tier", TIER) + .setDimension("priority", String.valueOf(PRIORITY)) + .setDimension("range", range) + .build("segment/rowCount/range/count", value); + } +}