diff --git a/common/src/main/java/io/druid/collections/CountingMap.java b/common/src/main/java/io/druid/collections/CountingMap.java deleted file mode 100644 index e8bc8453126..00000000000 --- a/common/src/main/java/io/druid/collections/CountingMap.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.collections; - -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableMap; - -import io.druid.java.util.common.guava.DefaultingHashMap; - -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -/** - */ -public class CountingMap extends DefaultingHashMap -{ - public CountingMap() - { - super(new Supplier() - { - @Override - public AtomicLong get() - { - return new AtomicLong(0); - } - }); - } - - public long add(K key, long value) - { - return get(key).addAndGet(value); - } - - public Map snapshot() - { - final ImmutableMap.Builder builder = ImmutableMap.builder(); - - for (Map.Entry entry : entrySet()) { - builder.put(entry.getKey(), entry.getValue().get()); - } - - return builder.build(); - } -} diff --git a/common/src/test/java/io/druid/collections/CountingMapTest.java b/common/src/test/java/io/druid/collections/CountingMapTest.java deleted file mode 100644 index 1aa1e5a2c91..00000000000 --- a/common/src/test/java/io/druid/collections/CountingMapTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.collections; - -import com.google.common.collect.ImmutableMap; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicLong; - -public class CountingMapTest -{ - private CountingMap mapObject = null ; - - @Before - public void setUp() - { - mapObject = new CountingMap(); - } - - @After - public void tearDown() - { - mapObject.clear(); - } - - @Test - public void testAdd() - { - long defaultValue = 10; - String defaultKey = "defaultKey"; - long actual; - actual = mapObject.add(defaultKey,defaultValue); - Assert.assertEquals("Values does not match", defaultValue, actual); - } - - @Test - public void testSnapshot() - { - long defaultValue = 10; - String defaultKey = "defaultKey"; - mapObject.add(defaultKey, defaultValue); - ImmutableMap snapShotMap = (ImmutableMap) mapObject.snapshot(); - Assert.assertEquals("Maps size does not match",mapObject.size(),snapShotMap.size()); - long expected = (long) snapShotMap.get(defaultKey); - AtomicLong actual = (AtomicLong) mapObject.get(defaultKey); - Assert.assertEquals("Values for key = " + defaultKey + " does not match", expected, actual.longValue()); - } -} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java index d9aa2223826..c703b0380f2 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -23,16 +23,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import io.druid.collections.CountingMap; import io.druid.data.input.InputRow; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.JobHelper; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.Pair; import io.druid.java.util.common.logger.Logger; import org.apache.hadoop.conf.Configuration; @@ -56,8 +52,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class DatasourceInputFormat extends InputFormat { @@ -72,7 +68,10 @@ public class DatasourceInputFormat extends InputFormat { Configuration conf = context.getConfiguration(); - String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read"); + String segmentsStr = Preconditions.checkNotNull( + conf.get(CONF_INPUT_SEGMENTS), + "No segments found to read" + ); List segments = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( segmentsStr, new TypeReference>() @@ -91,7 +90,7 @@ public class DatasourceInputFormat extends InputFormat for (WindowedDataSegment segment : segments) { totalSize += segment.getSegment().getSize(); } - int mapTask = ((JobConf)conf).getNumMapTasks(); + int mapTask = ((JobConf) conf).getNumMapTasks(); if (mapTask > 0) { maxSize = totalSize / mapTask; } @@ -159,7 +158,8 @@ public class DatasourceInputFormat extends InputFormat //and not consider the splitting. //also without this, isSplitable(..) fails with NPE because compressionCodecs is not properly setup. @Override - protected boolean isSplitable(FileSystem fs, Path file) { + protected boolean isSplitable(FileSystem fs, Path file) + { return false; } @@ -191,64 +191,63 @@ public class DatasourceInputFormat extends InputFormat JobConf conf ) { - String[] locations = null; - try { - locations = getFrequentLocations(segments, fio, conf); - } - catch (Exception e) { - logger.error(e, "Exception thrown finding location of splits"); - } + String[] locations = getFrequentLocations(getLocations(segments, fio, conf)); + return new DatasourceInputSplit(segments, locations); } - private String[] getFrequentLocations( - List segments, - org.apache.hadoop.mapred.InputFormat fio, - JobConf conf - ) throws IOException + @VisibleForTesting + static Stream getLocations( + final List segments, + final org.apache.hadoop.mapred.InputFormat fio, + final JobConf conf + ) { - Iterable locations = Collections.emptyList(); - for (WindowedDataSegment segment : segments) { - FileInputFormat.setInputPaths(conf, new Path(JobHelper.getURIFromSegment(segment.getSegment()))); - for (org.apache.hadoop.mapred.InputSplit split : fio.getSplits(conf, 1)) { - locations = Iterables.concat(locations, Arrays.asList(split.getLocations())); - } - } - return getFrequentLocations(locations); - } - - private static String[] getFrequentLocations(Iterable hosts) - { - - final CountingMap counter = new CountingMap<>(); - for (String location : hosts) { - counter.add(location, 1); - } - - final TreeSet> sorted = Sets.>newTreeSet( - new Comparator>() - { - @Override - public int compare(Pair o1, Pair o2) - { - int compare = o2.lhs.compareTo(o1.lhs); // descending - if (compare == 0) { - compare = o1.rhs.compareTo(o2.rhs); // ascending - } - return compare; + return segments.stream().sequential().flatMap( + (final WindowedDataSegment segment) -> { + FileInputFormat.setInputPaths( + conf, + new Path(JobHelper.getURIFromSegment(segment.getSegment())) + ); + try { + return Arrays.stream(fio.getSplits(conf, 1)).flatMap( + (final org.apache.hadoop.mapred.InputSplit split) -> { + try { + return Arrays.stream(split.getLocations()); + } + catch (final IOException e) { + logger.error(e, "Exception getting locations"); + return Stream.empty(); + } + } + ); + } + catch (final IOException e) { + logger.error(e, "Exception getting splits"); + return Stream.empty(); } } ); + } - for (Map.Entry entry : counter.entrySet()) { - sorted.add(Pair.of(entry.getValue().get(), entry.getKey())); - } + @VisibleForTesting + static String[] getFrequentLocations(final Stream locations) + { + final Map locationCountMap = locations.collect( + Collectors.groupingBy(location -> location, Collectors.counting()) + ); - // use default replication factor, if possible - final List locations = Lists.newArrayListWithCapacity(3); - for (Pair frequent : Iterables.limit(sorted, 3)) { - locations.add(frequent.rhs); - } - return locations.toArray(new String[locations.size()]); + final Comparator> valueComparator = + Map.Entry.comparingByValue(Comparator.reverseOrder()); + + final Comparator> keyComparator = + Map.Entry.comparingByKey(); + + return locationCountMap + .entrySet().stream() + .sorted(valueComparator.thenComparing(keyComparator)) + .limit(3) + .map(Map.Entry::getKey) + .toArray(String[]::new); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java index 3ad982369df..616d364a045 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java @@ -53,6 +53,7 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Stream; /** */ @@ -323,4 +324,107 @@ public class DatasourceInputFormatTest { Assert.assertTrue(new DatasourceInputFormat().createRecordReader(null, null) instanceof DatasourceRecordReader); } + + @Test + public void testGetFrequentLocationsEmpty() + { + Assert.assertArrayEquals( + new String[0], + DatasourceInputFormat.getFrequentLocations(Stream.empty()) + ); + } + + @Test + public void testGetFrequentLocationsLessThan3() + { + Assert.assertArrayEquals( + new String[]{"s1", "s2"}, + DatasourceInputFormat.getFrequentLocations(Stream.of("s2", "s1")) + ); + } + + @Test + public void testGetFrequentLocationsMoreThan3() + { + Assert.assertArrayEquals( + new String[]{"s3", "s1", "s2"}, + DatasourceInputFormat.getFrequentLocations( + Stream.of("s3", "e", "s2", "s3", "s4", "s3", "s1", "s3", "s2", "s1") + ) + ); + } + + @Test + public void testGetLocationsInputFormatException() throws IOException + { + final org.apache.hadoop.mapred.InputFormat fio = EasyMock.mock( + org.apache.hadoop.mapred.InputFormat.class + ); + + EasyMock.expect(fio.getSplits(config, 1)).andThrow(new IOException("testing")); + EasyMock.replay(fio); + + Assert.assertEquals( + 0, + DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count() + ); + } + + @Test + public void testGetLocationsSplitException() throws IOException + { + final org.apache.hadoop.mapred.InputFormat fio = EasyMock.mock( + org.apache.hadoop.mapred.InputFormat.class + ); + + final org.apache.hadoop.mapred.InputSplit split = EasyMock.mock( + org.apache.hadoop.mapred.InputSplit.class + ); + + EasyMock.expect(fio.getSplits(config, 1)).andReturn( + new org.apache.hadoop.mapred.InputSplit[] {split} + ); + EasyMock.expect(split.getLocations()).andThrow(new IOException("testing")); + + EasyMock.replay(fio, split); + + Assert.assertEquals( + 0, + DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count() + ); + } + + @Test + public void testGetLocations() throws IOException + { + final org.apache.hadoop.mapred.InputFormat fio = EasyMock.mock( + org.apache.hadoop.mapred.InputFormat.class + ); + + final org.apache.hadoop.mapred.InputSplit split = EasyMock.mock( + org.apache.hadoop.mapred.InputSplit.class + ); + + EasyMock.expect(fio.getSplits(config, 1)).andReturn( + new org.apache.hadoop.mapred.InputSplit[] {split} + ); + EasyMock.expect(split.getLocations()).andReturn(new String[] {"s1", "s2"}); + + EasyMock.expect(fio.getSplits(config, 1)).andReturn( + new org.apache.hadoop.mapred.InputSplit[] {split} + ); + EasyMock.expect(split.getLocations()).andReturn(new String[] {"s3"}); + + EasyMock.expect(fio.getSplits(config, 1)).andReturn( + new org.apache.hadoop.mapred.InputSplit[] {split} + ); + EasyMock.expect(split.getLocations()).andReturn(new String[] {"s4", "s2"}); + + EasyMock.replay(fio, split); + + Assert.assertArrayEquals( + new String[] {"s1", "s2", "s3", "s4", "s2"}, + DatasourceInputFormat.getLocations(segments, fio, config).toArray(String[]::new) + ); + } } diff --git a/java-util/src/main/java/io/druid/java/util/common/collect/CountingMap.java b/java-util/src/main/java/io/druid/java/util/common/collect/CountingMap.java deleted file mode 100644 index 1fbdc8894ea..00000000000 --- a/java-util/src/main/java/io/druid/java/util/common/collect/CountingMap.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.java.util.common.collect; - -import com.google.common.base.Function; -import com.google.common.collect.Maps; - -import java.util.AbstractMap; -import java.util.HashMap; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -// Can't find a good way to abstract over which counter representation is used, -// so I just pick Long/MutableLong. -public class CountingMap extends AbstractMap -{ - private final HashMap counts = new HashMap<>(); - - public void add(K k, Long n) - { - if (!counts.containsKey(k)) { - counts.put(k, new AtomicLong(0)); - } - counts.get(k).addAndGet(n); - } - - @Override - public Set> entrySet() - { - return Maps.transformValues( - counts, - new Function() - { - @Override - public Long apply(AtomicLong n) - { - return n.get(); - } - } - ).entrySet(); - } -} diff --git a/server/pom.xml b/server/pom.xml index a6898e8bcf6..4f263448ab3 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -181,6 +181,10 @@ org.apache.commons commons-math3 + + it.unimi.dsi + fastutil + diff --git a/server/src/main/java/io/druid/server/coordinator/CoordinatorStats.java b/server/src/main/java/io/druid/server/coordinator/CoordinatorStats.java index 5ff43529931..4589b141fc2 100644 --- a/server/src/main/java/io/druid/server/coordinator/CoordinatorStats.java +++ b/server/src/main/java/io/druid/server/coordinator/CoordinatorStats.java @@ -20,64 +20,98 @@ package io.druid.server.coordinator; import com.google.common.collect.Maps; -import io.druid.collections.CountingMap; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; +import java.util.Collections; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Set; +import java.util.function.ObjLongConsumer; /** */ public class CoordinatorStats { - private final Map> perTierStats; - private final CountingMap globalStats; + private final Map> perTierStats; + private final Object2LongOpenHashMap globalStats; public CoordinatorStats() { perTierStats = Maps.newHashMap(); - globalStats = new CountingMap(); + globalStats = new Object2LongOpenHashMap<>(); } - public Map> getPerTierStats() + public boolean hasPerTierStats() { - return perTierStats; + return !perTierStats.isEmpty(); } - public CountingMap getGlobalStats() + public Set getTiers(final String statName) { - return globalStats; - } - - public void addToTieredStat(String statName, String tier, long value) - { - CountingMap theStat = perTierStats.get(statName); + final Object2LongOpenHashMap theStat = perTierStats.get(statName); if (theStat == null) { - theStat = new CountingMap(); - perTierStats.put(statName, theStat); + return Collections.emptySet(); } - theStat.add(tier, value); + return Collections.unmodifiableSet(theStat.keySet()); } - public void addToGlobalStat(String statName, long value) + /** + * + * @param statName the name of the statistics + * @param tier the tier + * @return the value for the statistics {@code statName} under {@code tier} tier + * @throws NullPointerException if {@code statName} is not found + */ + public long getTieredStat(final String statName, final String tier) { - globalStats.add(statName, value); + return perTierStats.get(statName).getLong(tier); } - public CoordinatorStats accumulate(CoordinatorStats stats) + public void forEachTieredStat(final String statName, final ObjLongConsumer consumer) { - for (Map.Entry> entry : stats.perTierStats.entrySet()) { - CountingMap theStat = perTierStats.get(entry.getKey()); - if (theStat == null) { - theStat = new CountingMap(); - perTierStats.put(entry.getKey(), theStat); - } - for (Map.Entry tiers : entry.getValue().entrySet()) { - theStat.add(tiers.getKey(), tiers.getValue().get()); + final Object2LongOpenHashMap theStat = perTierStats.get(statName); + if (theStat != null) { + for (final Object2LongMap.Entry entry : theStat.object2LongEntrySet()) { + consumer.accept(entry.getKey(), entry.getLongValue()); } } - for (Map.Entry entry : stats.globalStats.entrySet()) { - globalStats.add(entry.getKey(), entry.getValue().get()); + } + + public long getGlobalStat(final String statName) + { + return globalStats.getLong(statName); + } + + public void addToTieredStat(final String statName, final String tier, final long value) + { + perTierStats.computeIfAbsent(statName, ignored -> new Object2LongOpenHashMap<>()) + .addTo(tier, value); + } + + public void addToGlobalStat(final String statName, final long value) + { + globalStats.addTo(statName, value); + } + + public CoordinatorStats accumulate(final CoordinatorStats stats) + { + stats.perTierStats.forEach( + (final String statName, final Object2LongOpenHashMap urStat) -> { + + final Object2LongOpenHashMap myStat = perTierStats.computeIfAbsent( + statName, ignored -> new Object2LongOpenHashMap<>() + ); + + for (final Object2LongMap.Entry entry : urStat.object2LongEntrySet()) { + myStat.addTo(entry.getKey(), entry.getLongValue()); + } + } + ); + + for (final Object2LongMap.Entry entry : stats.globalStats.object2LongEntrySet()) { + globalStats.addTo(entry.getKey(), entry.getLongValue()); } + return this; } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 23413af1581..4ca6333b3d3 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -39,7 +39,6 @@ import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.client.ServerInventoryView; import io.druid.client.indexing.IndexingServiceClient; -import io.druid.collections.CountingMap; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; import io.druid.curator.discovery.ServiceAnnouncer; @@ -70,6 +69,8 @@ import io.druid.server.coordinator.rules.Rule; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; @@ -230,65 +231,67 @@ public class DruidCoordinator return loadManagementPeons; } - public Map> getReplicationStatus() + public Map> getReplicationStatus() { - final Map> retVal = Maps.newHashMap(); + final Map> retVal = Maps.newHashMap(); if (segmentReplicantLookup == null) { return retVal; } final DateTime now = new DateTime(); - for (DataSegment segment : getAvailableDataSegments()) { - List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); - for (Rule rule : rules) { - if (rule instanceof LoadRule && rule.appliesTo(segment, now)) { - for (Map.Entry entry : ((LoadRule) rule).getTieredReplicants().entrySet()) { - CountingMap dataSourceMap = retVal.get(entry.getKey()); - if (dataSourceMap == null) { - dataSourceMap = new CountingMap<>(); - retVal.put(entry.getKey(), dataSourceMap); - } - int diff = Math.max( - entry.getValue() - segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), entry.getKey()), - 0 - ); - dataSourceMap.add(segment.getDataSource(), diff); - } - break; + for (final DataSegment segment : getAvailableDataSegments()) { + final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); + + for (final Rule rule : rules) { + if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) { + continue; } + + ((LoadRule) rule) + .getTieredReplicants() + .forEach( + (final String tier, final Integer ruleReplicants) -> { + int currentReplicants = segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), tier); + retVal + .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()) + .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); + } + ); } } return retVal; } - public CountingMap getSegmentAvailability() + + public Object2LongMap getSegmentAvailability() { - final CountingMap retVal = new CountingMap<>(); + final Object2LongOpenHashMap retVal = new Object2LongOpenHashMap<>(); if (segmentReplicantLookup == null) { return retVal; } for (DataSegment segment : getAvailableDataSegments()) { - int available = (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) ? 0 : 1; - retVal.add(segment.getDataSource(), 1 - available); + if (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) { + retVal.addTo(segment.getDataSource(), 1); + } else { + retVal.addTo(segment.getDataSource(), 0); + } } return retVal; } - CountingMap getLoadPendingDatasources() + boolean hasLoadPending(final String dataSource) { - final CountingMap retVal = new CountingMap<>(); - for (LoadQueuePeon peon : loadManagementPeons.values()) { - for (DataSegment segment : peon.getSegmentsToLoad()) { - retVal.add(segment.getDataSource(), 1); - } - } - return retVal; + return loadManagementPeons + .values() + .stream() + .flatMap((final LoadQueuePeon peon) -> peon.getSegmentsToLoad().stream()) + .anyMatch((final DataSegment segment) -> segment.getDataSource().equals(dataSource)); } public Map getLoadStatus() diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java index c8b722adb8a..4ab3870c0f1 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java @@ -19,13 +19,11 @@ package io.druid.server.coordinator.helper; -import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.DruidDataSource; import io.druid.client.ImmutableDruidServer; -import io.druid.collections.CountingMap; import io.druid.java.util.common.logger.Logger; import io.druid.query.DruidMetrics; import io.druid.server.coordinator.CoordinatorStats; @@ -35,10 +33,11 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; import io.druid.server.coordinator.LoadQueuePeon; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.objects.Object2LongMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** */ @@ -47,29 +46,38 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper private static final Logger log = new Logger(DruidCoordinatorLogger.class); private final DruidCoordinator coordinator; - public DruidCoordinatorLogger(DruidCoordinator coordinator) { + public DruidCoordinatorLogger(DruidCoordinator coordinator) + { this.coordinator = coordinator; } - private void emitTieredStats( + private void emitTieredStat( final ServiceEmitter emitter, final String metricName, - final Map statMap + final String tier, + final double value ) { - if (statMap != null) { - for (Map.Entry entry : statMap.entrySet()) { - String tier = entry.getKey(); - Number value = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.TIER, tier) - .build( - metricName, value.doubleValue() - ) - ); - } - } + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.TIER, tier) + .build(metricName, value) + ); + } + + private void emitTieredStats( + final ServiceEmitter emitter, + final String metricName, + final CoordinatorStats stats, + final String statName + ) + { + stats.forEachTieredStat( + statName, + (final String tier, final long count) -> { + emitTieredStat(emitter, metricName, tier, count); + } + ); } @Override @@ -79,113 +87,89 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper CoordinatorStats stats = params.getCoordinatorStats(); ServiceEmitter emitter = params.getEmitter(); - Map assigned = stats.getPerTierStats().get("assignedCount"); - if (assigned != null) { - for (Map.Entry entry : assigned.entrySet()) { - log.info( - "[%s] : Assigned %s segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() - ); - } - } + stats.forEachTieredStat( + "assignedCount", + (final String tier, final long count) -> { + log.info( + "[%s] : Assigned %s segments among %,d servers", + tier, count, cluster.getHistoricalsByTier(tier).size() + ); - emitTieredStats( - emitter, "segment/assigned/count", - assigned + emitTieredStat(emitter, "segment/assigned/count", tier, count); + } ); - Map dropped = stats.getPerTierStats().get("droppedCount"); - if (dropped != null) { - for (Map.Entry entry : dropped.entrySet()) { - log.info( - "[%s] : Dropped %s segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() - ); - } - } + stats.forEachTieredStat( + "droppedCount", + (final String tier, final long count) -> { + log.info( + "[%s] : Dropped %s segments among %,d servers", + tier, count, cluster.getHistoricalsByTier(tier).size() + ); - emitTieredStats( - emitter, "segment/dropped/count", - dropped + emitTieredStat(emitter, "segment/dropped/count", tier, count); + } ); emitTieredStats( emitter, "segment/cost/raw", - stats.getPerTierStats().get("initialCost") + stats, "initialCost" ); emitTieredStats( emitter, "segment/cost/normalization", - stats.getPerTierStats().get("normalization") + stats, "normalization" ); emitTieredStats( emitter, "segment/moved/count", - stats.getPerTierStats().get("movedCount") + stats, "movedCount" ); emitTieredStats( emitter, "segment/deleted/count", - stats.getPerTierStats().get("deletedCount") + stats, "deletedCount" ); - Map normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand"); - if (normalized != null) { - emitTieredStats( - emitter, "segment/cost/normalized", - Maps.transformEntries( - normalized, - new Maps.EntryTransformer() - { - @Override - public Number transformEntry(String key, AtomicLong value) - { - return value.doubleValue() / 1000d; - } - } - ) - ); - } + stats.forEachTieredStat( + "normalizedInitialCostTimesOneThousand", + (final String tier, final long count) -> { + emitTieredStat(emitter, "segment/cost/normalized", tier, count / 1000d); + } + ); - Map unneeded = stats.getPerTierStats().get("unneededCount"); - if (unneeded != null) { - for (Map.Entry entry : unneeded.entrySet()) { - log.info( - "[%s] : Removed %s unneeded segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() - ); - } - } - - emitTieredStats( - emitter, "segment/unneeded/count", - stats.getPerTierStats().get("unneededCount") + stats.forEachTieredStat( + "unneededCount", + (final String tier, final long count) -> { + log.info( + "[%s] : Removed %s unneeded segments among %,d servers", + tier, count, cluster.getHistoricalsByTier(tier).size() + ); + emitTieredStat(emitter, "segment/unneeded/count", tier, count); + } ); emitter.emit( new ServiceMetricEvent.Builder().build( - "segment/overShadowed/count", stats.getGlobalStats().get("overShadowedCount") + "segment/overShadowed/count", + stats.getGlobalStat("overShadowedCount") ) ); - Map moved = stats.getPerTierStats().get("movedCount"); - if (moved != null) { - for (Map.Entry entry : moved.entrySet()) { - log.info( - "[%s] : Moved %,d segment(s)", - entry.getKey(), entry.getValue().get() - ); - } - } - final Map unmoved = stats.getPerTierStats().get("unmovedCount"); - if (unmoved != null) { - for(Map.Entry entry : unmoved.entrySet()) { - log.info( - "[%s] : Let alone %,d segment(s)", - entry.getKey(), entry.getValue().get() - ); - } - } + stats.forEachTieredStat( + "movedCount", + (final String tier, final long count) -> { + log.info("[%s] : Moved %,d segment(s)", tier, count); + } + ); + + stats.forEachTieredStat( + "unmovedCount", + (final String tier, final long count) -> { + log.info("[%s] : Let alone %,d segment(s)", tier, count); + } + ); + log.info("Load Queues:"); for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { @@ -213,91 +197,92 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper } // Emit coordinator metrics - final Set> peonEntries = params.getLoadManagementPeons().entrySet(); - for (Map.Entry entry : peonEntries) { - String serverName = entry.getKey(); - LoadQueuePeon queuePeon = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.SERVER, serverName).build( - "segment/loadQueue/size", queuePeon.getLoadQueueSize() - ) - ); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.SERVER, serverName).build( - "segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() - ) - ); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.SERVER, serverName).build( - "segment/loadQueue/count", queuePeon.getSegmentsToLoad().size() - ) - ); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.SERVER, serverName).build( - "segment/dropQueue/count", queuePeon.getSegmentsToDrop().size() - ) - ); - } - for (Map.Entry entry : coordinator.getSegmentAvailability().entrySet()) { - String datasource = entry.getKey(); - Long count = entry.getValue().get(); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, datasource).build( - "segment/unavailable/count", count - ) - ); - } - for (Map.Entry> entry : coordinator.getReplicationStatus().entrySet()) { - String tier = entry.getKey(); - CountingMap datasourceAvailabilities = entry.getValue(); - for (Map.Entry datasourceAvailability : datasourceAvailabilities.entrySet()) { - String datasource = datasourceAvailability.getKey(); - Long count = datasourceAvailability.getValue().get(); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.TIER, tier) - .setDimension(DruidMetrics.DATASOURCE, datasource).build( - "segment/underReplicated/count", count - ) + params + .getLoadManagementPeons() + .forEach( + (final String serverName, final LoadQueuePeon queuePeon) -> { + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/loadQueue/size", queuePeon.getLoadQueueSize() + ) + ); + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() + ) + ); + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/loadQueue/count", queuePeon.getSegmentsToLoad().size() + ) + ); + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.SERVER, serverName).build( + "segment/dropQueue/count", queuePeon.getSegmentsToDrop().size() + ) + ); + } ); - } - } + + coordinator.getSegmentAvailability().object2LongEntrySet().forEach( + (final Object2LongMap.Entry entry) -> { + final String dataSource = entry.getKey(); + final long count = entry.getLongValue(); + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DATASOURCE, dataSource).build( + "segment/unavailable/count", count + ) + ); + } + ); + + coordinator.getReplicationStatus().forEach( + (final String tier, final Object2LongMap status) -> { + for (final Object2LongMap.Entry entry : status.object2LongEntrySet()) { + final String dataSource = entry.getKey(); + final long count = entry.getLongValue(); + + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.TIER, tier) + .setDimension(DruidMetrics.DATASOURCE, dataSource).build( + "segment/underReplicated/count", count + ) + ); + } + } + ); // Emit segment metrics - CountingMap segmentSizes = new CountingMap(); - CountingMap segmentCounts = new CountingMap(); - for (DruidDataSource dataSource : params.getDataSources()) { - for (DataSegment segment : dataSource.getSegments()) { - segmentSizes.add(dataSource.getName(), segment.getSize()); - segmentCounts.add(dataSource.getName(), 1L); - } - } - for (Map.Entry entry : segmentSizes.snapshot().entrySet()) { - String dataSource = entry.getKey(); - Long size = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource).build( - "segment/size", size - ) - ); - } - for (Map.Entry entry : segmentCounts.snapshot().entrySet()) { - String dataSource = entry.getKey(); - Long count = entry.getValue(); - emitter.emit( - new ServiceMetricEvent.Builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource).build( - "segment/count", count - ) - ); - } + final Stream allSegments = params + .getDataSources() + .stream() + .flatMap((final DruidDataSource dataSource) -> dataSource.getSegments().stream()); + allSegments + .collect(Collectors.groupingBy(DataSegment::getDataSource)) + .forEach( + (final String name, final List segments) -> { + final long size = segments.stream().mapToLong(DataSegment::getSize).sum(); + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DATASOURCE, name).build( + "segment/size", size + ) + ); + emitter.emit( + new ServiceMetricEvent.Builder() + .setDimension(DruidMetrics.DATASOURCE, name).build( + "segment/count", segments.size() + ) + ); + } + ); return params; } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java index ca2459f4707..275758f4b2f 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java @@ -130,11 +130,11 @@ public class DruidCoordinatorSegmentMerger implements DruidCoordinatorHelper } } - log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get()); + log.info("Issued merge requests for %s segments", stats.getGlobalStat("mergedCount")); params.getEmitter().emit( new ServiceMetricEvent.Builder().build( - "coordinator/merge/count", stats.getGlobalStats().get("mergedCount") + "coordinator/merge/count", stats.getGlobalStat("mergedCount") ) ); diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 6d83e4260d7..3ce2dcb7593 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -83,7 +83,7 @@ public abstract class LoadRule implements Rule segment ); stats.accumulate(assignStats); - totalReplicantsInCluster += assignStats.getPerTierStats().get(ASSIGNED_COUNT).get(tier).get(); + totalReplicantsInCluster += assignStats.getTieredStat(ASSIGNED_COUNT, tier); } loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier); diff --git a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java index ec389cf53bb..f6bd6e2e605 100644 --- a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java +++ b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java @@ -24,11 +24,12 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.metrics.AbstractMonitor; import io.druid.client.DruidServerConfig; -import io.druid.java.util.common.collect.CountingMap; import io.druid.query.DruidMetrics; import io.druid.server.SegmentManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import java.util.Map; @@ -55,15 +56,16 @@ public class HistoricalMetricsMonitor extends AbstractMonitor { emitter.emit(new ServiceMetricEvent.Builder().build("segment/max", serverConfig.getMaxSize())); - final CountingMap pendingDeleteSizes = new CountingMap(); + final Object2LongOpenHashMap pendingDeleteSizes = new Object2LongOpenHashMap<>(); for (DataSegment segment : zkCoordinator.getPendingDeleteSnapshot()) { - pendingDeleteSizes.add(segment.getDataSource(), segment.getSize()); + pendingDeleteSizes.addTo(segment.getDataSource(), segment.getSize()); } - for (Map.Entry entry : pendingDeleteSizes.entrySet()) { + for (final Object2LongMap.Entry entry : pendingDeleteSizes.object2LongEntrySet()) { + final String dataSource = entry.getKey(); - final long pendingDeleteSize = entry.getValue(); + final long pendingDeleteSize = entry.getLongValue(); emitter.emit( new ServiceMetricEvent.Builder() .setDimension(DruidMetrics.DATASOURCE, dataSource) diff --git a/server/src/test/java/io/druid/server/coordinator/CoordinatorStatsTest.java b/server/src/test/java/io/druid/server/coordinator/CoordinatorStatsTest.java new file mode 100644 index 00000000000..ea33c5301f4 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/CoordinatorStatsTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +public class CoordinatorStatsTest +{ + private CoordinatorStats stats; + + @Before + public void setUp() throws Exception + { + stats = new CoordinatorStats(); + } + + @After + public void tearDown() throws Exception + { + stats = null; + } + + @Test + public void addToGlobalStat() throws Exception + { + Assert.assertEquals(0, stats.getGlobalStat("stats")); + stats.addToGlobalStat("stats", 1); + Assert.assertEquals(1, stats.getGlobalStat("stats")); + stats.addToGlobalStat("stats", -11); + Assert.assertEquals(-10, stats.getGlobalStat("stats")); + } + + @Test(expected = NullPointerException.class) + public void testAddToTieredStatNonexistentStat() throws Exception + { + stats.getTieredStat("stat", "tier"); + } + + @Test + public void testAddToTieredStat() throws Exception + { + Assert.assertFalse(stats.hasPerTierStats()); + stats.addToTieredStat("stat1", "tier1", 1); + stats.addToTieredStat("stat1", "tier2", 1); + stats.addToTieredStat("stat1", "tier1", -5); + stats.addToTieredStat("stat2", "tier1", 1); + stats.addToTieredStat("stat1", "tier2", 1); + Assert.assertTrue(stats.hasPerTierStats()); + + Assert.assertEquals( + Sets.newHashSet("tier1", "tier2"), + stats.getTiers("stat1") + ); + Assert.assertEquals( + Sets.newHashSet("tier1"), + stats.getTiers("stat2") + ); + Assert.assertTrue(stats.getTiers("stat3").isEmpty()); + + Assert.assertEquals(-4, stats.getTieredStat("stat1", "tier1")); + Assert.assertEquals(2, stats.getTieredStat("stat1", "tier2")); + Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1")); + } + + @Test + public void testForEachTieredStat() throws Exception + { + final Map expected = ImmutableMap.of( + "tier1", 1L, + "tier2", 2L, + "tier3", 3L + ); + final Map actual = Maps.newHashMap(); + + expected.forEach( + (tier, count) -> stats.addToTieredStat("stat", tier, count) + ); + + stats.forEachTieredStat("stat0", (tier, count) -> Assert.fail()); + stats.forEachTieredStat("stat", actual::put); + + Assert.assertEquals(expected, actual); + } + + + @Test + public void testAccumulate() throws Exception + { + stats.addToGlobalStat("stat1", 1); + stats.addToGlobalStat("stat2", 1); + stats.addToTieredStat("stat1", "tier1", 1); + stats.addToTieredStat("stat1", "tier2", 1); + stats.addToTieredStat("stat2", "tier1", 1); + + final CoordinatorStats stats2 = new CoordinatorStats(); + stats2.addToGlobalStat("stat1", 1); + stats2.addToTieredStat("stat1", "tier2", 1); + stats2.addToTieredStat("stat2", "tier2", 1); + stats2.addToTieredStat("stat3", "tier1", 1); + + stats.accumulate(stats2); + + Assert.assertEquals(2, stats.getGlobalStat("stat1")); + Assert.assertEquals(1, stats.getGlobalStat("stat2")); + Assert.assertEquals(1, stats.getTieredStat("stat1", "tier1")); + Assert.assertEquals(2, stats.getTieredStat("stat1", "tier2")); + Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1")); + Assert.assertEquals(1, stats.getTieredStat("stat2", "tier2")); + Assert.assertEquals(1, stats.getTieredStat("stat3", "tier1")); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index d699f2369df..557dc3cb8aa 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -211,8 +211,8 @@ public class DruidCoordinatorBalancerTest .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); - Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); - Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size()); + Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); + Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") < segments.size()); exec.shutdown(); } @@ -292,7 +292,7 @@ public class DruidCoordinatorBalancerTest .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); - Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); + Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); exec.shutdown(); } @@ -394,7 +394,7 @@ public class DruidCoordinatorBalancerTest .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); - Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); + Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0); exec.shutdown(); } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index b540115580b..f59fbdcb246 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -203,11 +203,11 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 6); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 6); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 12); - Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); - Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + Assert.assertEquals(6L, stats.getTieredStat("assignedCount", "hot")); + Assert.assertEquals(6L, stats.getTieredStat("assignedCount", "normal")); + Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "cold")); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); exec.shutdown(); EasyMock.verify(mockPeon); @@ -305,10 +305,10 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 12); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 18); - Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); - Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "hot")); + Assert.assertEquals(18L, stats.getTieredStat("assignedCount", "cold")); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); exec.shutdown(); EasyMock.verify(mockPeon); @@ -402,10 +402,10 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 12); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 0); - Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); - Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "hot")); + Assert.assertEquals(0L, stats.getTieredStat("assignedCount", "normal")); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); exec.shutdown(); EasyMock.verify(mockPeon); @@ -602,7 +602,7 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + Assert.assertEquals(12L, stats.getGlobalStat("deletedCount")); exec.shutdown(); EasyMock.verify(coordinator); @@ -687,8 +687,8 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); - Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal")); + Assert.assertEquals(12L, stats.getGlobalStat("deletedCount")); exec.shutdown(); EasyMock.verify(mockPeon); @@ -779,8 +779,8 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); - Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal")); + Assert.assertEquals(12L, stats.getGlobalStat("deletedCount")); exec.shutdown(); EasyMock.verify(mockPeon); @@ -867,8 +867,8 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null); - Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + Assert.assertTrue(stats.getTiers("droppedCount").isEmpty()); + Assert.assertEquals(12L, stats.getGlobalStat("deletedCount")); exec.shutdown(); EasyMock.verify(mockPeon); @@ -968,7 +968,7 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); + Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal")); exec.shutdown(); EasyMock.verify(mockPeon); @@ -1049,9 +1049,9 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 48); - Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); - Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + Assert.assertEquals(48L, stats.getTieredStat("assignedCount", "hot")); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); DataSegment overFlowSegment = new DataSegment( "test", @@ -1078,9 +1078,9 @@ public class DruidCoordinatorRuleRunnerTest ); stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); - Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); - Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + Assert.assertEquals(1L, stats.getTieredStat("assignedCount", "hot")); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); EasyMock.verify(mockPeon); exec.shutdown(); @@ -1180,10 +1180,10 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = runner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 24); - Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 7); - Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); - Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + Assert.assertEquals(24L, stats.getTieredStat("assignedCount", "hot")); + Assert.assertEquals(7L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); EasyMock.verify(mockPeon); exec.shutdown(); @@ -1283,7 +1283,7 @@ public class DruidCoordinatorRuleRunnerTest CoordinatorStats stats = afterParams.getCoordinatorStats(); // There is no throttling on drop - Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 25); + Assert.assertEquals(25L, stats.getTieredStat("droppedCount", "normal")); EasyMock.verify(mockPeon); exec.shutdown(); } @@ -1371,10 +1371,10 @@ public class DruidCoordinatorRuleRunnerTest DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").size()); - Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").get("_default_tier").get()); - Assert.assertNull(stats.getPerTierStats().get("unassignedCount")); - Assert.assertNull(stats.getPerTierStats().get("unassignedSize")); + Assert.assertEquals(1, stats.getTiers("assignedCount").size()); + Assert.assertEquals(1, stats.getTieredStat("assignedCount", "_default_tier")); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); Assert.assertEquals(2, availableSegments.size()); Assert.assertEquals(availableSegments, params.getAvailableSegments()); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index ecd6c5904f8..4d8fdb7f08a 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -31,7 +31,6 @@ import io.druid.client.DruidServer; import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.client.SingleServerInventoryView; -import io.druid.collections.CountingMap; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; import io.druid.curator.CuratorTestBase; @@ -49,6 +48,7 @@ import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; +import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -330,14 +330,14 @@ public class DruidCoordinatorTest extends CuratorTestBase Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier())); // Wait for coordinator thread to run so that replication status is updated - while (coordinator.getSegmentAvailability().snapshot().get(dataSource) != 0) { + while (coordinator.getSegmentAvailability().getLong(dataSource) != 0) { Thread.sleep(50); } - Map segmentAvailability = coordinator.getSegmentAvailability().snapshot(); + Map segmentAvailability = coordinator.getSegmentAvailability(); Assert.assertEquals(1, segmentAvailability.size()); Assert.assertEquals(0L, segmentAvailability.get(dataSource)); - while (coordinator.getLoadPendingDatasources().get(dataSource).get() > 0) { + while (coordinator.hasLoadPending(dataSource)) { Thread.sleep(50); } @@ -348,17 +348,17 @@ public class DruidCoordinatorTest extends CuratorTestBase Thread.sleep(100); } - Map> replicationStatus = coordinator.getReplicationStatus(); + Map> replicationStatus = coordinator.getReplicationStatus(); Assert.assertNotNull(replicationStatus); Assert.assertEquals(1, replicationStatus.entrySet().size()); - CountingMap dataSourceMap = replicationStatus.get(tier); + Object2LongMap dataSourceMap = replicationStatus.get(tier); Assert.assertNotNull(dataSourceMap); Assert.assertEquals(1, dataSourceMap.size()); Assert.assertNotNull(dataSourceMap.get(dataSource)); // Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event // The load rules asks for 2 replicas, therefore 1 replica should still be pending - Assert.assertEquals(1L, dataSourceMap.get(dataSource).get()); + Assert.assertEquals(1L, dataSourceMap.getLong(dataSource)); coordinator.stop(); leaderUnannouncerLatch.await(); diff --git a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index c490df5cd9d..fe461117bd0 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -233,8 +233,8 @@ public class BroadcastDistributionRuleTest smallSegment ); - assertEquals(3, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue()); - assertTrue(stats.getPerTierStats().isEmpty()); + assertEquals(3L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); + assertEquals(false, stats.hasPerTierStats()); assertTrue( holdersOfLargeSegments.stream() @@ -273,8 +273,8 @@ public class BroadcastDistributionRuleTest smallSegment ); - assertEquals(5, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue()); - assertTrue(stats.getPerTierStats().isEmpty()); + assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); + assertEquals(false, stats.hasPerTierStats()); assertTrue( holdersOfLargeSegments.stream() @@ -311,8 +311,8 @@ public class BroadcastDistributionRuleTest smallSegment ); - assertEquals(6, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue()); - assertTrue(stats.getPerTierStats().isEmpty()); + assertEquals(6L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT)); + assertEquals(false, stats.hasPerTierStats()); assertTrue( druidCluster.getAllServers().stream() diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 37e114d09df..64d91e7c124 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -213,8 +213,8 @@ public class LoadRuleTest segment ); - Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get("hot").get() == 1); - Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get(DruidServer.DEFAULT_TIER).get() == 2); + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + Assert.assertEquals(2L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, DruidServer.DEFAULT_TIER)); exec.shutdown(); } @@ -324,8 +324,8 @@ public class LoadRuleTest segment ); - Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); - Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1); + Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot")); + Assert.assertEquals(1L, stats.getTieredStat("droppedCount", DruidServer.DEFAULT_TIER)); exec.shutdown(); } @@ -415,7 +415,7 @@ public class LoadRuleTest segment ); - Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get("hot").get() == 1); + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); exec.shutdown(); } @@ -521,7 +521,7 @@ public class LoadRuleTest segment ); - Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); + Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot")); exec.shutdown(); } }