Replace usages of CountingMap with Object2LongMap (#4320)

* Replaces use of CountingMap with Object2LongMap from fastutil.

* Remove CountingMap classes and minor fixes

* Added additional test cases for DatasourceInputFormat.

* Added additional test cases for CoordinatorStats.

* Not materializing segment list.

* Put in this fix because it is failing the test on its expected behavior.

* Added missing header.
This commit is contained in:
Goh Wei Xiang 2017-05-24 17:40:33 -07:00 committed by Fangjin Yang
parent b578adacae
commit b77fab8a30
18 changed files with 634 additions and 555 deletions

View File

@ -1,61 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.collections;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.guava.DefaultingHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class CountingMap<K> extends DefaultingHashMap<K, AtomicLong>
{
public CountingMap()
{
super(new Supplier<AtomicLong>()
{
@Override
public AtomicLong get()
{
return new AtomicLong(0);
}
});
}
public long add(K key, long value)
{
return get(key).addAndGet(value);
}
public Map<K, Long> snapshot()
{
final ImmutableMap.Builder<K, Long> builder = ImmutableMap.builder();
for (Map.Entry<K, AtomicLong> entry : entrySet()) {
builder.put(entry.getKey(), entry.getValue().get());
}
return builder.build();
}
}

View File

@ -1,68 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.collections;
import com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicLong;
public class CountingMapTest
{
private CountingMap mapObject = null ;
@Before
public void setUp()
{
mapObject = new CountingMap();
}
@After
public void tearDown()
{
mapObject.clear();
}
@Test
public void testAdd()
{
long defaultValue = 10;
String defaultKey = "defaultKey";
long actual;
actual = mapObject.add(defaultKey,defaultValue);
Assert.assertEquals("Values does not match", defaultValue, actual);
}
@Test
public void testSnapshot()
{
long defaultValue = 10;
String defaultKey = "defaultKey";
mapObject.add(defaultKey, defaultValue);
ImmutableMap snapShotMap = (ImmutableMap) mapObject.snapshot();
Assert.assertEquals("Maps size does not match",mapObject.size(),snapShotMap.size());
long expected = (long) snapShotMap.get(defaultKey);
AtomicLong actual = (AtomicLong) mapObject.get(defaultKey);
Assert.assertEquals("Values for key = " + defaultKey + " does not match", expected, actual.longValue());
}
}

View File

@ -23,16 +23,12 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.collections.CountingMap;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
@ -56,8 +52,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
@ -72,7 +68,10 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
Configuration conf = context.getConfiguration();
String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read");
String segmentsStr = Preconditions.checkNotNull(
conf.get(CONF_INPUT_SEGMENTS),
"No segments found to read"
);
List<WindowedDataSegment> segments = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
segmentsStr,
new TypeReference<List<WindowedDataSegment>>()
@ -91,7 +90,7 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize();
}
int mapTask = ((JobConf)conf).getNumMapTasks();
int mapTask = ((JobConf) conf).getNumMapTasks();
if (mapTask > 0) {
maxSize = totalSize / mapTask;
}
@ -159,7 +158,8 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
//and not consider the splitting.
//also without this, isSplitable(..) fails with NPE because compressionCodecs is not properly setup.
@Override
protected boolean isSplitable(FileSystem fs, Path file) {
protected boolean isSplitable(FileSystem fs, Path file)
{
return false;
}
@ -191,64 +191,63 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
JobConf conf
)
{
String[] locations = null;
try {
locations = getFrequentLocations(segments, fio, conf);
}
catch (Exception e) {
logger.error(e, "Exception thrown finding location of splits");
}
String[] locations = getFrequentLocations(getLocations(segments, fio, conf));
return new DatasourceInputSplit(segments, locations);
}
private String[] getFrequentLocations(
List<WindowedDataSegment> segments,
org.apache.hadoop.mapred.InputFormat fio,
JobConf conf
) throws IOException
@VisibleForTesting
static Stream<String> getLocations(
final List<WindowedDataSegment> segments,
final org.apache.hadoop.mapred.InputFormat fio,
final JobConf conf
)
{
Iterable<String> locations = Collections.emptyList();
for (WindowedDataSegment segment : segments) {
FileInputFormat.setInputPaths(conf, new Path(JobHelper.getURIFromSegment(segment.getSegment())));
for (org.apache.hadoop.mapred.InputSplit split : fio.getSplits(conf, 1)) {
locations = Iterables.concat(locations, Arrays.asList(split.getLocations()));
}
}
return getFrequentLocations(locations);
}
private static String[] getFrequentLocations(Iterable<String> hosts)
{
final CountingMap<String> counter = new CountingMap<>();
for (String location : hosts) {
counter.add(location, 1);
}
final TreeSet<Pair<Long, String>> sorted = Sets.<Pair<Long, String>>newTreeSet(
new Comparator<Pair<Long, String>>()
{
@Override
public int compare(Pair<Long, String> o1, Pair<Long, String> o2)
{
int compare = o2.lhs.compareTo(o1.lhs); // descending
if (compare == 0) {
compare = o1.rhs.compareTo(o2.rhs); // ascending
}
return compare;
return segments.stream().sequential().flatMap(
(final WindowedDataSegment segment) -> {
FileInputFormat.setInputPaths(
conf,
new Path(JobHelper.getURIFromSegment(segment.getSegment()))
);
try {
return Arrays.stream(fio.getSplits(conf, 1)).flatMap(
(final org.apache.hadoop.mapred.InputSplit split) -> {
try {
return Arrays.stream(split.getLocations());
}
catch (final IOException e) {
logger.error(e, "Exception getting locations");
return Stream.empty();
}
}
);
}
catch (final IOException e) {
logger.error(e, "Exception getting splits");
return Stream.empty();
}
}
);
}
for (Map.Entry<String, AtomicLong> entry : counter.entrySet()) {
sorted.add(Pair.of(entry.getValue().get(), entry.getKey()));
}
@VisibleForTesting
static String[] getFrequentLocations(final Stream<String> locations)
{
final Map<String, Long> locationCountMap = locations.collect(
Collectors.groupingBy(location -> location, Collectors.counting())
);
// use default replication factor, if possible
final List<String> locations = Lists.newArrayListWithCapacity(3);
for (Pair<Long, String> frequent : Iterables.limit(sorted, 3)) {
locations.add(frequent.rhs);
}
return locations.toArray(new String[locations.size()]);
final Comparator<Map.Entry<String, Long>> valueComparator =
Map.Entry.comparingByValue(Comparator.reverseOrder());
final Comparator<Map.Entry<String, Long>> keyComparator =
Map.Entry.comparingByKey();
return locationCountMap
.entrySet().stream()
.sorted(valueComparator.thenComparing(keyComparator))
.limit(3)
.map(Map.Entry::getKey)
.toArray(String[]::new);
}
}

View File

@ -53,6 +53,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
*/
@ -323,4 +324,107 @@ public class DatasourceInputFormatTest
{
Assert.assertTrue(new DatasourceInputFormat().createRecordReader(null, null) instanceof DatasourceRecordReader);
}
@Test
public void testGetFrequentLocationsEmpty()
{
Assert.assertArrayEquals(
new String[0],
DatasourceInputFormat.getFrequentLocations(Stream.empty())
);
}
@Test
public void testGetFrequentLocationsLessThan3()
{
Assert.assertArrayEquals(
new String[]{"s1", "s2"},
DatasourceInputFormat.getFrequentLocations(Stream.of("s2", "s1"))
);
}
@Test
public void testGetFrequentLocationsMoreThan3()
{
Assert.assertArrayEquals(
new String[]{"s3", "s1", "s2"},
DatasourceInputFormat.getFrequentLocations(
Stream.of("s3", "e", "s2", "s3", "s4", "s3", "s1", "s3", "s2", "s1")
)
);
}
@Test
public void testGetLocationsInputFormatException() throws IOException
{
final org.apache.hadoop.mapred.InputFormat fio = EasyMock.mock(
org.apache.hadoop.mapred.InputFormat.class
);
EasyMock.expect(fio.getSplits(config, 1)).andThrow(new IOException("testing"));
EasyMock.replay(fio);
Assert.assertEquals(
0,
DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count()
);
}
@Test
public void testGetLocationsSplitException() throws IOException
{
final org.apache.hadoop.mapred.InputFormat fio = EasyMock.mock(
org.apache.hadoop.mapred.InputFormat.class
);
final org.apache.hadoop.mapred.InputSplit split = EasyMock.mock(
org.apache.hadoop.mapred.InputSplit.class
);
EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split}
);
EasyMock.expect(split.getLocations()).andThrow(new IOException("testing"));
EasyMock.replay(fio, split);
Assert.assertEquals(
0,
DatasourceInputFormat.getLocations(segments.subList(0, 1), fio, config).count()
);
}
@Test
public void testGetLocations() throws IOException
{
final org.apache.hadoop.mapred.InputFormat fio = EasyMock.mock(
org.apache.hadoop.mapred.InputFormat.class
);
final org.apache.hadoop.mapred.InputSplit split = EasyMock.mock(
org.apache.hadoop.mapred.InputSplit.class
);
EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split}
);
EasyMock.expect(split.getLocations()).andReturn(new String[] {"s1", "s2"});
EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split}
);
EasyMock.expect(split.getLocations()).andReturn(new String[] {"s3"});
EasyMock.expect(fio.getSplits(config, 1)).andReturn(
new org.apache.hadoop.mapred.InputSplit[] {split}
);
EasyMock.expect(split.getLocations()).andReturn(new String[] {"s4", "s2"});
EasyMock.replay(fio, split);
Assert.assertArrayEquals(
new String[] {"s1", "s2", "s3", "s4", "s2"},
DatasourceInputFormat.getLocations(segments, fio, config).toArray(String[]::new)
);
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.java.util.common.collect;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
// Can't find a good way to abstract over which counter representation is used,
// so I just pick Long/MutableLong.
public class CountingMap<K> extends AbstractMap<K, Long>
{
private final HashMap<K, AtomicLong> counts = new HashMap<>();
public void add(K k, Long n)
{
if (!counts.containsKey(k)) {
counts.put(k, new AtomicLong(0));
}
counts.get(k).addAndGet(n);
}
@Override
public Set<Entry<K, Long>> entrySet()
{
return Maps.transformValues(
counts,
new Function<AtomicLong, Long>()
{
@Override
public Long apply(AtomicLong n)
{
return n.get();
}
}
).entrySet();
}
}

View File

@ -181,6 +181,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -20,64 +20,98 @@
package io.druid.server.coordinator;
import com.google.common.collect.Maps;
import io.druid.collections.CountingMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Set;
import java.util.function.ObjLongConsumer;
/**
*/
public class CoordinatorStats
{
private final Map<String, CountingMap<String>> perTierStats;
private final CountingMap<String> globalStats;
private final Map<String, Object2LongOpenHashMap<String>> perTierStats;
private final Object2LongOpenHashMap<String> globalStats;
public CoordinatorStats()
{
perTierStats = Maps.newHashMap();
globalStats = new CountingMap<String>();
globalStats = new Object2LongOpenHashMap<>();
}
public Map<String, CountingMap<String>> getPerTierStats()
public boolean hasPerTierStats()
{
return perTierStats;
return !perTierStats.isEmpty();
}
public CountingMap<String> getGlobalStats()
public Set<String> getTiers(final String statName)
{
return globalStats;
}
public void addToTieredStat(String statName, String tier, long value)
{
CountingMap<String> theStat = perTierStats.get(statName);
final Object2LongOpenHashMap<String> theStat = perTierStats.get(statName);
if (theStat == null) {
theStat = new CountingMap<String>();
perTierStats.put(statName, theStat);
return Collections.emptySet();
}
theStat.add(tier, value);
return Collections.unmodifiableSet(theStat.keySet());
}
public void addToGlobalStat(String statName, long value)
/**
*
* @param statName the name of the statistics
* @param tier the tier
* @return the value for the statistics {@code statName} under {@code tier} tier
* @throws NullPointerException if {@code statName} is not found
*/
public long getTieredStat(final String statName, final String tier)
{
globalStats.add(statName, value);
return perTierStats.get(statName).getLong(tier);
}
public CoordinatorStats accumulate(CoordinatorStats stats)
public void forEachTieredStat(final String statName, final ObjLongConsumer<String> consumer)
{
for (Map.Entry<String, CountingMap<String>> entry : stats.perTierStats.entrySet()) {
CountingMap<String> theStat = perTierStats.get(entry.getKey());
if (theStat == null) {
theStat = new CountingMap<String>();
perTierStats.put(entry.getKey(), theStat);
}
for (Map.Entry<String, AtomicLong> tiers : entry.getValue().entrySet()) {
theStat.add(tiers.getKey(), tiers.getValue().get());
final Object2LongOpenHashMap<String> theStat = perTierStats.get(statName);
if (theStat != null) {
for (final Object2LongMap.Entry<String> entry : theStat.object2LongEntrySet()) {
consumer.accept(entry.getKey(), entry.getLongValue());
}
}
for (Map.Entry<String, AtomicLong> entry : stats.globalStats.entrySet()) {
globalStats.add(entry.getKey(), entry.getValue().get());
}
public long getGlobalStat(final String statName)
{
return globalStats.getLong(statName);
}
public void addToTieredStat(final String statName, final String tier, final long value)
{
perTierStats.computeIfAbsent(statName, ignored -> new Object2LongOpenHashMap<>())
.addTo(tier, value);
}
public void addToGlobalStat(final String statName, final long value)
{
globalStats.addTo(statName, value);
}
public CoordinatorStats accumulate(final CoordinatorStats stats)
{
stats.perTierStats.forEach(
(final String statName, final Object2LongOpenHashMap<String> urStat) -> {
final Object2LongOpenHashMap<String> myStat = perTierStats.computeIfAbsent(
statName, ignored -> new Object2LongOpenHashMap<>()
);
for (final Object2LongMap.Entry<String> entry : urStat.object2LongEntrySet()) {
myStat.addTo(entry.getKey(), entry.getLongValue());
}
}
);
for (final Object2LongMap.Entry<String> entry : stats.globalStats.object2LongEntrySet()) {
globalStats.addTo(entry.getKey(), entry.getLongValue());
}
return this;
}
}

View File

@ -39,7 +39,6 @@ import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.collections.CountingMap;
import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
@ -70,6 +69,8 @@ import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
@ -230,65 +231,67 @@ public class DruidCoordinator
return loadManagementPeons;
}
public Map<String, CountingMap<String>> getReplicationStatus()
public Map<String, ? extends Object2LongMap<String>> getReplicationStatus()
{
final Map<String, CountingMap<String>> retVal = Maps.newHashMap();
final Map<String, Object2LongOpenHashMap<String>> retVal = Maps.newHashMap();
if (segmentReplicantLookup == null) {
return retVal;
}
final DateTime now = new DateTime();
for (DataSegment segment : getAvailableDataSegments()) {
List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (Rule rule : rules) {
if (rule instanceof LoadRule && rule.appliesTo(segment, now)) {
for (Map.Entry<String, Integer> entry : ((LoadRule) rule).getTieredReplicants().entrySet()) {
CountingMap<String> dataSourceMap = retVal.get(entry.getKey());
if (dataSourceMap == null) {
dataSourceMap = new CountingMap<>();
retVal.put(entry.getKey(), dataSourceMap);
}
int diff = Math.max(
entry.getValue() - segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), entry.getKey()),
0
);
dataSourceMap.add(segment.getDataSource(), diff);
}
break;
for (final DataSegment segment : getAvailableDataSegments()) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (final Rule rule : rules) {
if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {
continue;
}
((LoadRule) rule)
.getTieredReplicants()
.forEach(
(final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), tier);
retVal
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
}
);
}
}
return retVal;
}
public CountingMap<String> getSegmentAvailability()
public Object2LongMap<String> getSegmentAvailability()
{
final CountingMap<String> retVal = new CountingMap<>();
final Object2LongOpenHashMap<String> retVal = new Object2LongOpenHashMap<>();
if (segmentReplicantLookup == null) {
return retVal;
}
for (DataSegment segment : getAvailableDataSegments()) {
int available = (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) ? 0 : 1;
retVal.add(segment.getDataSource(), 1 - available);
if (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) {
retVal.addTo(segment.getDataSource(), 1);
} else {
retVal.addTo(segment.getDataSource(), 0);
}
}
return retVal;
}
CountingMap<String> getLoadPendingDatasources()
boolean hasLoadPending(final String dataSource)
{
final CountingMap<String> retVal = new CountingMap<>();
for (LoadQueuePeon peon : loadManagementPeons.values()) {
for (DataSegment segment : peon.getSegmentsToLoad()) {
retVal.add(segment.getDataSource(), 1);
}
}
return retVal;
return loadManagementPeons
.values()
.stream()
.flatMap((final LoadQueuePeon peon) -> peon.getSegmentsToLoad().stream())
.anyMatch((final DataSegment segment) -> segment.getDataSource().equals(dataSource));
}
public Map<String, Double> getLoadStatus()

View File

@ -19,13 +19,11 @@
package io.druid.server.coordinator.helper;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.collections.CountingMap;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.DruidMetrics;
import io.druid.server.coordinator.CoordinatorStats;
@ -35,10 +33,11 @@ import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
*/
@ -47,29 +46,38 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
private static final Logger log = new Logger(DruidCoordinatorLogger.class);
private final DruidCoordinator coordinator;
public DruidCoordinatorLogger(DruidCoordinator coordinator) {
public DruidCoordinatorLogger(DruidCoordinator coordinator)
{
this.coordinator = coordinator;
}
private <T extends Number> void emitTieredStats(
private void emitTieredStat(
final ServiceEmitter emitter,
final String metricName,
final Map<String, T> statMap
final String tier,
final double value
)
{
if (statMap != null) {
for (Map.Entry<String, T> entry : statMap.entrySet()) {
String tier = entry.getKey();
Number value = entry.getValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.build(
metricName, value.doubleValue()
)
);
}
}
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.build(metricName, value)
);
}
private void emitTieredStats(
final ServiceEmitter emitter,
final String metricName,
final CoordinatorStats stats,
final String statName
)
{
stats.forEachTieredStat(
statName,
(final String tier, final long count) -> {
emitTieredStat(emitter, metricName, tier, count);
}
);
}
@Override
@ -79,113 +87,89 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
CoordinatorStats stats = params.getCoordinatorStats();
ServiceEmitter emitter = params.getEmitter();
Map<String, AtomicLong> assigned = stats.getPerTierStats().get("assignedCount");
if (assigned != null) {
for (Map.Entry<String, AtomicLong> entry : assigned.entrySet()) {
log.info(
"[%s] : Assigned %s segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size()
);
}
}
stats.forEachTieredStat(
"assignedCount",
(final String tier, final long count) -> {
log.info(
"[%s] : Assigned %s segments among %,d servers",
tier, count, cluster.getHistoricalsByTier(tier).size()
);
emitTieredStats(
emitter, "segment/assigned/count",
assigned
emitTieredStat(emitter, "segment/assigned/count", tier, count);
}
);
Map<String, AtomicLong> dropped = stats.getPerTierStats().get("droppedCount");
if (dropped != null) {
for (Map.Entry<String, AtomicLong> entry : dropped.entrySet()) {
log.info(
"[%s] : Dropped %s segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size()
);
}
}
stats.forEachTieredStat(
"droppedCount",
(final String tier, final long count) -> {
log.info(
"[%s] : Dropped %s segments among %,d servers",
tier, count, cluster.getHistoricalsByTier(tier).size()
);
emitTieredStats(
emitter, "segment/dropped/count",
dropped
emitTieredStat(emitter, "segment/dropped/count", tier, count);
}
);
emitTieredStats(
emitter, "segment/cost/raw",
stats.getPerTierStats().get("initialCost")
stats, "initialCost"
);
emitTieredStats(
emitter, "segment/cost/normalization",
stats.getPerTierStats().get("normalization")
stats, "normalization"
);
emitTieredStats(
emitter, "segment/moved/count",
stats.getPerTierStats().get("movedCount")
stats, "movedCount"
);
emitTieredStats(
emitter, "segment/deleted/count",
stats.getPerTierStats().get("deletedCount")
stats, "deletedCount"
);
Map<String, AtomicLong> normalized = stats.getPerTierStats().get("normalizedInitialCostTimesOneThousand");
if (normalized != null) {
emitTieredStats(
emitter, "segment/cost/normalized",
Maps.transformEntries(
normalized,
new Maps.EntryTransformer<String, AtomicLong, Number>()
{
@Override
public Number transformEntry(String key, AtomicLong value)
{
return value.doubleValue() / 1000d;
}
}
)
);
}
stats.forEachTieredStat(
"normalizedInitialCostTimesOneThousand",
(final String tier, final long count) -> {
emitTieredStat(emitter, "segment/cost/normalized", tier, count / 1000d);
}
);
Map<String, AtomicLong> unneeded = stats.getPerTierStats().get("unneededCount");
if (unneeded != null) {
for (Map.Entry<String, AtomicLong> entry : unneeded.entrySet()) {
log.info(
"[%s] : Removed %s unneeded segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size()
);
}
}
emitTieredStats(
emitter, "segment/unneeded/count",
stats.getPerTierStats().get("unneededCount")
stats.forEachTieredStat(
"unneededCount",
(final String tier, final long count) -> {
log.info(
"[%s] : Removed %s unneeded segments among %,d servers",
tier, count, cluster.getHistoricalsByTier(tier).size()
);
emitTieredStat(emitter, "segment/unneeded/count", tier, count);
}
);
emitter.emit(
new ServiceMetricEvent.Builder().build(
"segment/overShadowed/count", stats.getGlobalStats().get("overShadowedCount")
"segment/overShadowed/count",
stats.getGlobalStat("overShadowedCount")
)
);
Map<String, AtomicLong> moved = stats.getPerTierStats().get("movedCount");
if (moved != null) {
for (Map.Entry<String, AtomicLong> entry : moved.entrySet()) {
log.info(
"[%s] : Moved %,d segment(s)",
entry.getKey(), entry.getValue().get()
);
}
}
final Map<String, AtomicLong> unmoved = stats.getPerTierStats().get("unmovedCount");
if (unmoved != null) {
for(Map.Entry<String, AtomicLong> entry : unmoved.entrySet()) {
log.info(
"[%s] : Let alone %,d segment(s)",
entry.getKey(), entry.getValue().get()
);
}
}
stats.forEachTieredStat(
"movedCount",
(final String tier, final long count) -> {
log.info("[%s] : Moved %,d segment(s)", tier, count);
}
);
stats.forEachTieredStat(
"unmovedCount",
(final String tier, final long count) -> {
log.info("[%s] : Let alone %,d segment(s)", tier, count);
}
);
log.info("Load Queues:");
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
@ -213,91 +197,92 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
}
// Emit coordinator metrics
final Set<Map.Entry<String, LoadQueuePeon>> peonEntries = params.getLoadManagementPeons().entrySet();
for (Map.Entry<String, LoadQueuePeon> entry : peonEntries) {
String serverName = entry.getKey();
LoadQueuePeon queuePeon = entry.getValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/size", queuePeon.getLoadQueueSize()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/count", queuePeon.getSegmentsToLoad().size()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/dropQueue/count", queuePeon.getSegmentsToDrop().size()
)
);
}
for (Map.Entry<String, AtomicLong> entry : coordinator.getSegmentAvailability().entrySet()) {
String datasource = entry.getKey();
Long count = entry.getValue().get();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, datasource).build(
"segment/unavailable/count", count
)
);
}
for (Map.Entry<String, CountingMap<String>> entry : coordinator.getReplicationStatus().entrySet()) {
String tier = entry.getKey();
CountingMap<String> datasourceAvailabilities = entry.getValue();
for (Map.Entry<String, AtomicLong> datasourceAvailability : datasourceAvailabilities.entrySet()) {
String datasource = datasourceAvailability.getKey();
Long count = datasourceAvailability.getValue().get();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, datasource).build(
"segment/underReplicated/count", count
)
params
.getLoadManagementPeons()
.forEach(
(final String serverName, final LoadQueuePeon queuePeon) -> {
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/size", queuePeon.getLoadQueueSize()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/failed", queuePeon.getAndResetFailedAssignCount()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/loadQueue/count", queuePeon.getSegmentsToLoad().size()
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.SERVER, serverName).build(
"segment/dropQueue/count", queuePeon.getSegmentsToDrop().size()
)
);
}
);
}
}
coordinator.getSegmentAvailability().object2LongEntrySet().forEach(
(final Object2LongMap.Entry<String> entry) -> {
final String dataSource = entry.getKey();
final long count = entry.getLongValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/unavailable/count", count
)
);
}
);
coordinator.getReplicationStatus().forEach(
(final String tier, final Object2LongMap<String> status) -> {
for (final Object2LongMap.Entry<String> entry : status.object2LongEntrySet()) {
final String dataSource = entry.getKey();
final long count = entry.getLongValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/underReplicated/count", count
)
);
}
}
);
// Emit segment metrics
CountingMap<String> segmentSizes = new CountingMap<String>();
CountingMap<String> segmentCounts = new CountingMap<String>();
for (DruidDataSource dataSource : params.getDataSources()) {
for (DataSegment segment : dataSource.getSegments()) {
segmentSizes.add(dataSource.getName(), segment.getSize());
segmentCounts.add(dataSource.getName(), 1L);
}
}
for (Map.Entry<String, Long> entry : segmentSizes.snapshot().entrySet()) {
String dataSource = entry.getKey();
Long size = entry.getValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/size", size
)
);
}
for (Map.Entry<String, Long> entry : segmentCounts.snapshot().entrySet()) {
String dataSource = entry.getKey();
Long count = entry.getValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/count", count
)
);
}
final Stream<DataSegment> allSegments = params
.getDataSources()
.stream()
.flatMap((final DruidDataSource dataSource) -> dataSource.getSegments().stream());
allSegments
.collect(Collectors.groupingBy(DataSegment::getDataSource))
.forEach(
(final String name, final List<DataSegment> segments) -> {
final long size = segments.stream().mapToLong(DataSegment::getSize).sum();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, name).build(
"segment/size", size
)
);
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, name).build(
"segment/count", segments.size()
)
);
}
);
return params;
}

View File

@ -130,11 +130,11 @@ public class DruidCoordinatorSegmentMerger implements DruidCoordinatorHelper
}
}
log.info("Issued merge requests for %s segments", stats.getGlobalStats().get("mergedCount").get());
log.info("Issued merge requests for %s segments", stats.getGlobalStat("mergedCount"));
params.getEmitter().emit(
new ServiceMetricEvent.Builder().build(
"coordinator/merge/count", stats.getGlobalStats().get("mergedCount")
"coordinator/merge/count", stats.getGlobalStat("mergedCount")
)
);

View File

@ -83,7 +83,7 @@ public abstract class LoadRule implements Rule
segment
);
stats.accumulate(assignStats);
totalReplicantsInCluster += assignStats.getPerTierStats().get(ASSIGNED_COUNT).get(tier).get();
totalReplicantsInCluster += assignStats.getTieredStat(ASSIGNED_COUNT, tier);
}
loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier);

View File

@ -24,11 +24,12 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor;
import io.druid.client.DruidServerConfig;
import io.druid.java.util.common.collect.CountingMap;
import io.druid.query.DruidMetrics;
import io.druid.server.SegmentManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.util.Map;
@ -55,15 +56,16 @@ public class HistoricalMetricsMonitor extends AbstractMonitor
{
emitter.emit(new ServiceMetricEvent.Builder().build("segment/max", serverConfig.getMaxSize()));
final CountingMap<String> pendingDeleteSizes = new CountingMap<String>();
final Object2LongOpenHashMap<String> pendingDeleteSizes = new Object2LongOpenHashMap<>();
for (DataSegment segment : zkCoordinator.getPendingDeleteSnapshot()) {
pendingDeleteSizes.add(segment.getDataSource(), segment.getSize());
pendingDeleteSizes.addTo(segment.getDataSource(), segment.getSize());
}
for (Map.Entry<String, Long> entry : pendingDeleteSizes.entrySet()) {
for (final Object2LongMap.Entry<String> entry : pendingDeleteSizes.object2LongEntrySet()) {
final String dataSource = entry.getKey();
final long pendingDeleteSize = entry.getValue();
final long pendingDeleteSize = entry.getLongValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)

View File

@ -0,0 +1,136 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
public class CoordinatorStatsTest
{
private CoordinatorStats stats;
@Before
public void setUp() throws Exception
{
stats = new CoordinatorStats();
}
@After
public void tearDown() throws Exception
{
stats = null;
}
@Test
public void addToGlobalStat() throws Exception
{
Assert.assertEquals(0, stats.getGlobalStat("stats"));
stats.addToGlobalStat("stats", 1);
Assert.assertEquals(1, stats.getGlobalStat("stats"));
stats.addToGlobalStat("stats", -11);
Assert.assertEquals(-10, stats.getGlobalStat("stats"));
}
@Test(expected = NullPointerException.class)
public void testAddToTieredStatNonexistentStat() throws Exception
{
stats.getTieredStat("stat", "tier");
}
@Test
public void testAddToTieredStat() throws Exception
{
Assert.assertFalse(stats.hasPerTierStats());
stats.addToTieredStat("stat1", "tier1", 1);
stats.addToTieredStat("stat1", "tier2", 1);
stats.addToTieredStat("stat1", "tier1", -5);
stats.addToTieredStat("stat2", "tier1", 1);
stats.addToTieredStat("stat1", "tier2", 1);
Assert.assertTrue(stats.hasPerTierStats());
Assert.assertEquals(
Sets.newHashSet("tier1", "tier2"),
stats.getTiers("stat1")
);
Assert.assertEquals(
Sets.newHashSet("tier1"),
stats.getTiers("stat2")
);
Assert.assertTrue(stats.getTiers("stat3").isEmpty());
Assert.assertEquals(-4, stats.getTieredStat("stat1", "tier1"));
Assert.assertEquals(2, stats.getTieredStat("stat1", "tier2"));
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1"));
}
@Test
public void testForEachTieredStat() throws Exception
{
final Map<String, Long> expected = ImmutableMap.of(
"tier1", 1L,
"tier2", 2L,
"tier3", 3L
);
final Map<String, Long> actual = Maps.newHashMap();
expected.forEach(
(tier, count) -> stats.addToTieredStat("stat", tier, count)
);
stats.forEachTieredStat("stat0", (tier, count) -> Assert.fail());
stats.forEachTieredStat("stat", actual::put);
Assert.assertEquals(expected, actual);
}
@Test
public void testAccumulate() throws Exception
{
stats.addToGlobalStat("stat1", 1);
stats.addToGlobalStat("stat2", 1);
stats.addToTieredStat("stat1", "tier1", 1);
stats.addToTieredStat("stat1", "tier2", 1);
stats.addToTieredStat("stat2", "tier1", 1);
final CoordinatorStats stats2 = new CoordinatorStats();
stats2.addToGlobalStat("stat1", 1);
stats2.addToTieredStat("stat1", "tier2", 1);
stats2.addToTieredStat("stat2", "tier2", 1);
stats2.addToTieredStat("stat3", "tier1", 1);
stats.accumulate(stats2);
Assert.assertEquals(2, stats.getGlobalStat("stat1"));
Assert.assertEquals(1, stats.getGlobalStat("stat2"));
Assert.assertEquals(1, stats.getTieredStat("stat1", "tier1"));
Assert.assertEquals(2, stats.getTieredStat("stat1", "tier2"));
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier1"));
Assert.assertEquals(1, stats.getTieredStat("stat2", "tier2"));
Assert.assertEquals(1, stats.getTieredStat("stat3", "tier1"));
}
}

View File

@ -211,8 +211,8 @@ public class DruidCoordinatorBalancerTest
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size());
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") < segments.size());
exec.shutdown();
}
@ -292,7 +292,7 @@ public class DruidCoordinatorBalancerTest
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
exec.shutdown();
}
@ -394,7 +394,7 @@ public class DruidCoordinatorBalancerTest
.build();
params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
exec.shutdown();
}

View File

@ -203,11 +203,11 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 6);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
Assert.assertEquals(6L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(6L, stats.getTieredStat("assignedCount", "normal"));
Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "cold"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
exec.shutdown();
EasyMock.verify(mockPeon);
@ -305,10 +305,10 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("cold").get() == 18);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(18L, stats.getTieredStat("assignedCount", "cold"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
exec.shutdown();
EasyMock.verify(mockPeon);
@ -402,10 +402,10 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 12);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("normal").get() == 0);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
Assert.assertEquals(12L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(0L, stats.getTieredStat("assignedCount", "normal"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
exec.shutdown();
EasyMock.verify(mockPeon);
@ -602,7 +602,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(coordinator);
@ -687,8 +687,8 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(mockPeon);
@ -779,8 +779,8 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(mockPeon);
@ -867,8 +867,8 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null);
Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12);
Assert.assertTrue(stats.getTiers("droppedCount").isEmpty());
Assert.assertEquals(12L, stats.getGlobalStat("deletedCount"));
exec.shutdown();
EasyMock.verify(mockPeon);
@ -968,7 +968,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1);
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "normal"));
exec.shutdown();
EasyMock.verify(mockPeon);
@ -1049,9 +1049,9 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 48);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
Assert.assertEquals(48L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
DataSegment overFlowSegment = new DataSegment(
"test",
@ -1078,9 +1078,9 @@ public class DruidCoordinatorRuleRunnerTest
);
stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
Assert.assertEquals(1L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
EasyMock.verify(mockPeon);
exec.shutdown();
@ -1180,10 +1180,10 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = runner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 24);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 7);
Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null);
Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null);
Assert.assertEquals(24L, stats.getTieredStat("assignedCount", "hot"));
Assert.assertEquals(7L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
EasyMock.verify(mockPeon);
exec.shutdown();
@ -1283,7 +1283,7 @@ public class DruidCoordinatorRuleRunnerTest
CoordinatorStats stats = afterParams.getCoordinatorStats();
// There is no throttling on drop
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 25);
Assert.assertEquals(25L, stats.getTieredStat("droppedCount", "normal"));
EasyMock.verify(mockPeon);
exec.shutdown();
}
@ -1371,10 +1371,10 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").size());
Assert.assertEquals(1, stats.getPerTierStats().get("assignedCount").get("_default_tier").get());
Assert.assertNull(stats.getPerTierStats().get("unassignedCount"));
Assert.assertNull(stats.getPerTierStats().get("unassignedSize"));
Assert.assertEquals(1, stats.getTiers("assignedCount").size());
Assert.assertEquals(1, stats.getTieredStat("assignedCount", "_default_tier"));
Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
Assert.assertEquals(2, availableSegments.size());
Assert.assertEquals(availableSegments, params.getAvailableSegments());

View File

@ -31,7 +31,6 @@ import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.client.SingleServerInventoryView;
import io.druid.collections.CountingMap;
import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
@ -49,6 +48,7 @@ import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
@ -330,14 +330,14 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier()));
// Wait for coordinator thread to run so that replication status is updated
while (coordinator.getSegmentAvailability().snapshot().get(dataSource) != 0) {
while (coordinator.getSegmentAvailability().getLong(dataSource) != 0) {
Thread.sleep(50);
}
Map segmentAvailability = coordinator.getSegmentAvailability().snapshot();
Map segmentAvailability = coordinator.getSegmentAvailability();
Assert.assertEquals(1, segmentAvailability.size());
Assert.assertEquals(0L, segmentAvailability.get(dataSource));
while (coordinator.getLoadPendingDatasources().get(dataSource).get() > 0) {
while (coordinator.hasLoadPending(dataSource)) {
Thread.sleep(50);
}
@ -348,17 +348,17 @@ public class DruidCoordinatorTest extends CuratorTestBase
Thread.sleep(100);
}
Map<String, CountingMap<String>> replicationStatus = coordinator.getReplicationStatus();
Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertNotNull(replicationStatus);
Assert.assertEquals(1, replicationStatus.entrySet().size());
CountingMap<String> dataSourceMap = replicationStatus.get(tier);
Object2LongMap<String> dataSourceMap = replicationStatus.get(tier);
Assert.assertNotNull(dataSourceMap);
Assert.assertEquals(1, dataSourceMap.size());
Assert.assertNotNull(dataSourceMap.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, therefore 1 replica should still be pending
Assert.assertEquals(1L, dataSourceMap.get(dataSource).get());
Assert.assertEquals(1L, dataSourceMap.getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();

View File

@ -233,8 +233,8 @@ public class BroadcastDistributionRuleTest
smallSegment
);
assertEquals(3, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue());
assertTrue(stats.getPerTierStats().isEmpty());
assertEquals(3L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
assertEquals(false, stats.hasPerTierStats());
assertTrue(
holdersOfLargeSegments.stream()
@ -273,8 +273,8 @@ public class BroadcastDistributionRuleTest
smallSegment
);
assertEquals(5, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue());
assertTrue(stats.getPerTierStats().isEmpty());
assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
assertEquals(false, stats.hasPerTierStats());
assertTrue(
holdersOfLargeSegments.stream()
@ -311,8 +311,8 @@ public class BroadcastDistributionRuleTest
smallSegment
);
assertEquals(6, stats.getGlobalStats().get(LoadRule.ASSIGNED_COUNT).intValue());
assertTrue(stats.getPerTierStats().isEmpty());
assertEquals(6L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
assertEquals(false, stats.hasPerTierStats());
assertTrue(
druidCluster.getAllServers().stream()

View File

@ -213,8 +213,8 @@ public class LoadRuleTest
segment
);
Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get(DruidServer.DEFAULT_TIER).get() == 2);
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
Assert.assertEquals(2L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, DruidServer.DEFAULT_TIER));
exec.shutdown();
}
@ -324,8 +324,8 @@ public class LoadRuleTest
segment
);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot"));
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", DruidServer.DEFAULT_TIER));
exec.shutdown();
}
@ -415,7 +415,7 @@ public class LoadRuleTest
segment
);
Assert.assertTrue(stats.getPerTierStats().get(LoadRule.ASSIGNED_COUNT).get("hot").get() == 1);
Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
exec.shutdown();
}
@ -521,7 +521,7 @@ public class LoadRuleTest
segment
);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
Assert.assertEquals(1L, stats.getTieredStat("droppedCount", "hot"));
exec.shutdown();
}
}