Avoid many unnecessary materializations of collections of 'all segments in cluster' cardinality (#7185)

* Avoid many  unnecessary materializations of collections of 'all segments in cluster' cardinality

* Fix DruidCoordinatorTest; Renamed DruidCoordinator.getReplicationStatus() to computeUnderReplicationCountsPerDataSourcePerTier()

* More Javadocs, typos, refactor DruidCoordinatorRuntimeParams.createAvailableSegmentsSet()

* Style

* typo

* Disable StaticPseudoFunctionalStyleMethod inspection because of too much false positives

* Fixes
This commit is contained in:
Roman Leventov 2019-03-19 18:22:56 -03:00 committed by GitHub
parent e58f541047
commit dfd27e00c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 548 additions and 391 deletions

View File

@ -269,6 +269,30 @@
<constraint name="b" within="" contains="" />
<constraint name="c" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="No need to wrap a collection as unmodifiable before addAll()" text="$c$.addAll(Collections.$unmodifiableMethod$($c2$))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="__context__" target="true" within="" contains="" />
<constraint name="c" within="" contains="" />
<constraint name="unmodifiableMethod" regexp="unmodifiable.*" within="" contains="" />
<constraint name="c2" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="No need to copy a collection as immutable before addAll()" text="$m$.addAll($ImmutableCollection$.copyOf($x$))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="__context__" target="true" within="" contains="" />
<constraint name="m" within="" contains="" />
<constraint name="x" maxCount="2147483647" within="" contains="" />
<constraint name="ImmutableCollection" regexp="Immutable.*" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="No need to wrap a map as unmodifiable before putAll()" text="$m$.putAll(Collections.$unmodifiableMapMethod$($m2$))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="__context__" target="true" within="" contains="" />
<constraint name="m" within="" contains="" />
<constraint name="m2" within="" contains="" />
<constraint name="unmodifiableMapMethod" regexp="unmodifiable.*" within="" contains="" />
</searchConfiguration>
<searchConfiguration name="No need to copy a map to immutable before putAll()" text="$m$.putAll($ImmutableMap$.copyOf($x$))" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="__context__" target="true" within="" contains="" />
<constraint name="m" within="" contains="" />
<constraint name="x" maxCount="2147483647" within="" contains="" />
<constraint name="ImmutableMap" regexp="Immutable.*" within="" contains="" />
</searchConfiguration>
</inspection_tool>
<inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
@ -278,6 +302,10 @@
</inspection_tool>
<inspection_tool class="StaticCallOnSubclass" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="StaticFieldReferenceOnSubclass" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="StaticPseudoFunctionalStyleMethod" enabled="true" level="INFORMATION" enabled_by_default="true">
<!-- The current rate of false-positives produced by this inspection is very high,
see https://youtrack.jetbrains.com/issue/IDEA-153047#focus=streamItem-27-3326648.0-0 -->
</inspection_tool>
<inspection_tool class="StringConcatenationInFormatCall" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="StringConcatenationInMessageFormatCall" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="StringConcatenationMissingWhitespace" enabled="true" level="WARNING" enabled_by_default="true" />

View File

@ -88,9 +88,9 @@ public class ScheduledExecutors
public void run()
{
try {
log.debug("Running %s (delay %s)", callable, delay);
log.trace("Running %s (delay %s)", callable, delay);
if (callable.call() == Signal.REPEAT) {
log.debug("Rescheduling %s (delay %s)", callable, delay);
log.trace("Rescheduling %s (delay %s)", callable, delay);
exec.schedule(this, delay.getMillis(), TimeUnit.MILLISECONDS);
} else {
log.debug("Stopped rescheduling %s (delay %s)", callable, delay);
@ -154,7 +154,7 @@ public class ScheduledExecutors
}
try {
log.debug("Running %s (period %s)", callable, rate);
log.trace("Running %s (period %s)", callable, rate);
prevSignal = callable.call();
}
catch (Throwable e) {

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.utils;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Supplier;
import java.util.stream.Stream;
public final class CollectionUtils
{
/**
* Returns a lazy collection from a stream supplier and a size. {@link Collection#iterator()} of the returned
* collection delegates to {@link Stream#iterator()} on the stream returned from the supplier.
*/
public static <E> Collection<E> createLazyCollectionFromStream(Supplier<Stream<E>> sequentialStreamSupplier, int size)
{
return new AbstractCollection<E>()
{
@Override
public Iterator<E> iterator()
{
return sequentialStreamSupplier.get().iterator();
}
@Override
public Spliterator<E> spliterator()
{
return sequentialStreamSupplier.get().spliterator();
}
@Override
public Stream<E> stream()
{
return sequentialStreamSupplier.get();
}
@Override
public Stream<E> parallelStream()
{
return sequentialStreamSupplier.get().parallel();
}
@Override
public int size()
{
return size;
}
};
}
private CollectionUtils() {}
}

View File

@ -20,8 +20,6 @@
package org.apache.druid.emitter.ambari.metrics;
import com.fasterxml.jackson.databind.Module;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
@ -35,6 +33,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class AmbariMetricsEmitterModule implements DruidModule
{
@ -57,17 +56,11 @@ public class AmbariMetricsEmitterModule implements DruidModule
@Named(EMITTER_TYPE)
public Emitter getEmitter(AmbariMetricsEmitterConfig emitterConfig, final Injector injector)
{
List<Emitter> emitters = Lists.transform(
emitterConfig.getAlertEmitters(),
new Function<String, Emitter>()
{
@Override
public Emitter apply(String s)
{
return injector.getInstance(Key.get(Emitter.class, Names.named(s)));
}
}
);
List<Emitter> emitters = emitterConfig
.getAlertEmitters()
.stream()
.map((String name) -> injector.getInstance(Key.get(Emitter.class, Names.named(name))))
.collect(Collectors.toList());
return new AmbariMetricsEmitter(emitterConfig, emitters);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@ -156,12 +157,12 @@ public class WorkerTaskMonitor extends WorkerTaskManager
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
throws Exception
{
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
if (CuratorUtils.isChildAdded(event)) {
final Task task = jsonMapper.readValue(
cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
cf.getData().forPath(event.getData().getPath()),
Task.class
);

View File

@ -219,7 +219,7 @@ public class BrokerServerView implements TimelineServerView
private QueryableDruidServer removeServer(DruidServer server)
{
for (DataSegment segment : server.getSegments()) {
for (DataSegment segment : server.iterateAllSegments()) {
serverRemovedSegment(server.getMetadata(), segment);
}
return clients.remove(server.getName());

View File

@ -123,7 +123,7 @@ public class CoordinatorServerView implements InventoryView
private void removeServer(DruidServer server)
{
for (DataSegment segment : server.getSegments()) {
for (DataSegment segment : server.iterateAllSegments()) {
serverRemovedSegment(server.getMetadata(), segment);
}
}

View File

@ -153,15 +153,27 @@ public class DruidServer implements Comparable<DruidServer>
return metadata.getHostAndTlsPort() != null ? "https" : "http";
}
public Iterable<DataSegment> getSegments()
/**
* Returns an iterable to go over all segments in all data sources, stored on this DruidServer. The order in which
* segments are iterated is unspecified.
*
* Since this DruidServer can be mutated concurrently, the set of segments observed during an iteration may _not_ be
* a momentary snapshot of the segments on the server, in other words, it may be that there was no moment when the
* DruidServer stored exactly the returned set of segments.
*
* Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try (to some
* reasonable extent) to organize the code so that it iterates the returned iterable only once rather than several
* times.
*/
public Iterable<DataSegment> iterateAllSegments()
{
return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
}
/**
* Returns the current number of segments, stored in this DruidServer object. This number if weakly consistent with
* the number of segments if {@link #getSegments} is iterated about the same time, because segments might be added or
* removed in parallel.
* the number of segments if {@link #iterateAllSegments} is iterated about the same time, because segments might be
* added or removed in parallel.
*/
public int getTotalSegments()
{
@ -200,7 +212,7 @@ public class DruidServer implements Comparable<DruidServer>
public DruidServer addDataSegments(DruidServer server)
{
server.getSegments().forEach(this::addDataSegment);
server.iterateAllSegments().forEach(this::addDataSegment);
return this;
}

View File

@ -70,16 +70,17 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class uses internal-discovery i.e. {@link DruidNodeDiscoveryProvider} to discover various queryable nodes in the cluster
* such as historicals and realtime peon processes.
* For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs in SegmentListerResource.getSegments(..),
* to keep sync'd state of segments served by those servers.
* This class uses internal-discovery i.e. {@link DruidNodeDiscoveryProvider} to discover various queryable nodes in the
* cluster such as historicals and realtime peon processes.
*
* For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs for {@link
* org.apache.druid.server.http.SegmentListerResource#getSegments}), to keep sync'd state of segments served by those
* servers.
*/
public class HttpServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
{
public static final TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>> SEGMENT_LIST_RESP_TYPE_REF = new TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>>()
{
};
public static final TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>> SEGMENT_LIST_RESP_TYPE_REF =
new TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>>() {};
private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class);
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
@ -545,7 +546,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
public void fullSync(List<DataSegmentChangeRequest> changes)
{
Map<SegmentId, DataSegment> toRemove = Maps.newHashMapWithExpectedSize(druidServer.getTotalSegments());
druidServer.getSegments().forEach(segment -> toRemove.put(segment.getId(), segment));
druidServer.iterateAllSegments().forEach(segment -> toRemove.put(segment.getId(), segment));
for (DataSegmentChangeRequest request : changes) {
if (request instanceof SegmentChangeRequestLoad) {

View File

@ -26,10 +26,10 @@ import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import java.util.AbstractCollection;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Iterator;
/**
* This class should not be subclassed, it isn't declared final only to make it possible to mock the class with EasyMock
@ -107,6 +107,7 @@ public class ImmutableDruidServer
return metadata.getPriority();
}
@Nullable
public DataSegment getSegment(SegmentId segmentId)
{
ImmutableDruidDataSource dataSource = dataSources.get(segmentId.getDataSource());
@ -126,22 +127,22 @@ public class ImmutableDruidServer
return dataSources.get(name);
}
public Collection<DataSegment> getSegments()
/**
* Returns a lazy collection with all segments in all data sources, stored on this ImmutableDruidServer. The order
* of segments in this collection is unspecified.
*
* Calling {@link Collection#size()} on the returned collection is cheap, O(1).
*
* Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an
* ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only
* once rather than several times.
*/
public Collection<DataSegment> getLazyAllSegments()
{
return new AbstractCollection<DataSegment>()
{
@Override
public Iterator<DataSegment> iterator()
{
return dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
}
@Override
public int size()
{
return totalSegments;
}
};
return CollectionUtils.createLazyCollectionFromStream(
() -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()),
totalSegments
);
}
public String getURL()

View File

@ -20,6 +20,7 @@
package org.apache.druid.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.zookeeper.CreateMode;
@ -127,4 +128,9 @@ public class CuratorUtils
);
}
}
public static boolean isChildAdded(PathChildrenCacheEvent event)
{
return event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.metadata;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@ -59,6 +60,14 @@ public interface MetadataSegmentManager
Collection<ImmutableDruidDataSource> getDataSources();
/**
* Returns an iterable to go over all segments in all data sources. The order in which segments are iterated is
* unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
* (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
* several times.
*/
Iterable<DataSegment> iterateAllSegments();
Collection<String> getAllDataSourceNames();
/**

View File

@ -435,6 +435,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
.collect(Collectors.toList());
}
@Override
public Iterable<DataSegment> iterateAllSegments()
{
return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
}
@Override
public Collection<String> getAllDataSourceNames()
{

View File

@ -55,8 +55,10 @@ public interface BalancerStrategy
/**
* Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
* @param serverHolders set of historicals to consider for moving segments
* @return {@link BalancerSegmentHolder} containing segment to move and server it current resides on
* @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
* there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Nullable
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);
/**

View File

@ -258,7 +258,7 @@ public class CostBalancerStrategy implements BalancerStrategy
{
double cost = 0;
for (ServerHolder server : serverHolders) {
Iterable<DataSegment> segments = server.getServer().getSegments();
Iterable<DataSegment> segments = server.getServer().getLazyAllSegments();
for (DataSegment s : segments) {
cost += computeJointSegmentsCost(s, segments);
}
@ -280,7 +280,7 @@ public class CostBalancerStrategy implements BalancerStrategy
{
double cost = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments()) {
for (DataSegment segment : server.getServer().getLazyAllSegments()) {
cost += computeJointSegmentsCost(segment, segment);
}
}
@ -334,7 +334,7 @@ public class CostBalancerStrategy implements BalancerStrategy
// the sum of the costs of other (exclusive of the proposalSegment) segments on the server
cost += computeJointSegmentsCost(
proposalSegment,
Iterables.filter(server.getServer().getSegments(), segment -> !proposalSegment.equals(segment))
Iterables.filter(server.getServer().getLazyAllSegments(), segment -> !proposalSegment.equals(segment))
);
// plus the costs of segments that will be loaded

View File

@ -79,13 +79,13 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final Object lock = new Object();

View File

@ -47,8 +47,8 @@ public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
}
int nSegments = 1;
if (server.getServer().getSegments().size() > 0) {
nSegments = server.getServer().getSegments().size();
if (server.getServer().getLazyAllSegments().size() > 0) {
nSegments = server.getServer().getLazyAllSegments().size();
}
double normalizedCost = cost / nSegments;

View File

@ -81,7 +81,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -93,7 +92,26 @@ import java.util.stream.Collectors;
@ManageLifecycle
public class DruidCoordinator
{
public static Comparator<DataSegment> SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
/**
* This comparator orders "freshest" segments first, i. e. segments with most recent intervals.
*
* It is used in historical nodes' {@link LoadQueuePeon}s to make historicals load more recent segment first.
*
* It is also used in {@link DruidCoordinatorRuntimeParams} for {@link
* DruidCoordinatorRuntimeParams#getAvailableSegments()} - a collection of segments to be considered during some
* coordinator run for different {@link DruidCoordinatorHelper}s. The order matters only for {@link
* DruidCoordinatorRuleRunner}, which tries to apply the rules while iterating the segments in the order imposed by
* this comparator. In {@link LoadRule} the throttling limit may be hit (via {@link ReplicationThrottler}; see
* {@link CoordinatorDynamicConfig#getReplicationThrottleLimit()}). So before we potentially hit this limit, we want
* to schedule loading the more recent segments (among all of those that need to be loaded).
*
* In both {@link LoadQueuePeon}s and {@link DruidCoordinatorRuleRunner}, we want to load more recent segments first
* because presumably they are queried more often and contain are more important data for users, so if the Druid
* cluster has availability problems and struggling to make all segments available immediately, at least we try to
* make more "important" (more recent) segments available as soon as possible.
*/
static final Comparator<DataSegment> SEGMENT_COMPARATOR_RECENT_FIRST = Ordering
.from(Comparators.intervalsByEndThenStart())
.onResultOf(DataSegment::getInterval)
.compound(Ordering.<DataSegment>natural())
.reverse();
@ -224,17 +242,18 @@ public class DruidCoordinator
return loadManagementPeons;
}
public Map<String, ? extends Object2LongMap<String>> getReplicationStatus()
/** @return tier -> { dataSource -> underReplicationCount } map */
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
{
final Map<String, Object2LongOpenHashMap<String>> retVal = new HashMap<>();
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
if (segmentReplicantLookup == null) {
return retVal;
return underReplicationCountsPerDataSourcePerTier;
}
final DateTime now = DateTimes.nowUtc();
for (final DataSegment segment : getAvailableDataSegments()) {
for (final DataSegment segment : iterateAvailableDataSegments()) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (final Rule rule : rules) {
@ -246,15 +265,16 @@ public class DruidCoordinator
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
retVal
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
break; // only the first matching rule applies
}
}
return retVal;
return underReplicationCountsPerDataSourcePerTier;
}
public Object2LongMap<String> getSegmentAvailability()
@ -265,7 +285,7 @@ public class DruidCoordinator
return retVal;
}
for (DataSegment segment : getAvailableDataSegments()) {
for (DataSegment segment : iterateAvailableDataSegments()) {
if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) {
retVal.addTo(segment.getDataSource(), 1);
} else {
@ -428,30 +448,15 @@ public class DruidCoordinator
}
}
public Set<DataSegment> getOrderedAvailableDataSegments()
/**
* Returns an iterable to go over all available segments in all data sources. The order in which segments are iterated
* is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
* (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
* several times.
*/
public Iterable<DataSegment> iterateAvailableDataSegments()
{
Set<DataSegment> availableSegments = new TreeSet<>(SEGMENT_COMPARATOR);
Iterable<DataSegment> dataSegments = getAvailableDataSegments();
for (DataSegment dataSegment : dataSegments) {
if (dataSegment.getSize() < 0) {
log.makeAlert("No size on Segment, wtf?")
.addData("segment", dataSegment)
.emit();
}
availableSegments.add(dataSegment);
}
return availableSegments;
}
private List<DataSegment> getAvailableDataSegments()
{
return metadataSegmentManager.getDataSources()
.stream()
.flatMap(source -> source.getSegments().stream())
.collect(Collectors.toList());
return metadataSegmentManager.iterateAllSegments();
}
@LifecycleStart
@ -735,7 +740,7 @@ public class DruidCoordinator
.build();
},
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
new DruidCoordinatorCleanupUnneeded(),
new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
new DruidCoordinatorBalancer(DruidCoordinator.this),
new DruidCoordinatorLogger(DruidCoordinator.this)

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -28,24 +29,35 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
/**
*/
public class DruidCoordinatorRuntimeParams
{
/**
* Creates a TreeSet sorted in {@link DruidCoordinator#SEGMENT_COMPARATOR_RECENT_FIRST} order and populates it with
* the segments from the given iterable. The given iterable is iterated exactly once. No special action is taken if
* duplicate segments are encountered in the iterable.
*/
public static TreeSet<DataSegment> createAvailableSegmentsSet(Iterable<DataSegment> availableSegments)
{
TreeSet<DataSegment> segmentsSet = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
availableSegments.forEach(segmentsSet::add);
return segmentsSet;
}
private final long startTime;
private final DruidCluster druidCluster;
private final MetadataRuleManager databaseRuleManager;
private final SegmentReplicantLookup segmentReplicantLookup;
private final Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
private final Set<DataSegment> availableSegments;
private final @Nullable TreeSet<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private final ReplicationThrottler replicationManager;
private final ServiceEmitter emitter;
@ -61,7 +73,7 @@ public class DruidCoordinatorRuntimeParams
MetadataRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
Set<DataSegment> availableSegments,
@Nullable TreeSet<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
@ -113,8 +125,9 @@ public class DruidCoordinatorRuntimeParams
return dataSources;
}
public Set<DataSegment> getAvailableSegments()
public TreeSet<DataSegment> getAvailableSegments()
{
Preconditions.checkState(availableSegments != null, "availableSegments must be set");
return availableSegments;
}
@ -196,7 +209,7 @@ public class DruidCoordinatorRuntimeParams
databaseRuleManager,
segmentReplicantLookup,
dataSources,
new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR),
null, // availableSegments
loadManagementPeons,
replicationManager,
emitter,
@ -215,7 +228,7 @@ public class DruidCoordinatorRuntimeParams
private MetadataRuleManager databaseRuleManager;
private SegmentReplicantLookup segmentReplicantLookup;
private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
private final Set<DataSegment> availableSegments;
private @Nullable TreeSet<DataSegment> availableSegments;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private ReplicationThrottler replicationManager;
private ServiceEmitter emitter;
@ -232,7 +245,7 @@ public class DruidCoordinatorRuntimeParams
this.databaseRuleManager = null;
this.segmentReplicantLookup = null;
this.dataSources = new HashMap<>();
this.availableSegments = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR);
this.availableSegments = null;
this.loadManagementPeons = new HashMap<>();
this.replicationManager = null;
this.emitter = null;
@ -248,7 +261,7 @@ public class DruidCoordinatorRuntimeParams
MetadataRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
Set<DataSegment> availableSegments,
@Nullable TreeSet<DataSegment> availableSegments,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
@ -346,22 +359,37 @@ public class DruidCoordinatorRuntimeParams
return this;
}
/** This method must be used in test code only. */
@VisibleForTesting
public Builder withAvailableSegments(DataSegment... availableSegments)
public Builder withAvailableSegmentsInTest(DataSegment... availableSegments)
{
this.availableSegments.addAll(Arrays.asList(availableSegments));
return this;
return withAvailableSegmentsInTest(Arrays.asList(availableSegments));
}
public Builder withAvailableSegments(Collection<DataSegment> availableSegments)
/** This method must be used in test code only. */
@VisibleForTesting
public Builder withAvailableSegmentsInTest(Collection<DataSegment> availableSegments)
{
this.availableSegments.addAll(Collections.unmodifiableCollection(availableSegments));
return setAvailableSegments(createAvailableSegmentsSet(availableSegments));
}
/**
* Note: unlike {@link #withAvailableSegmentsInTest(Collection)}, this method doesn't make a defensive copy of the
* provided set. The set passed into this method must not be modified afterwards.
*/
public Builder setAvailableSegments(TreeSet<DataSegment> availableSegments)
{
//noinspection ObjectEquality
if (availableSegments.comparator() != DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST) {
throw new IllegalArgumentException("Expected DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST");
}
this.availableSegments = availableSegments;
return this;
}
public Builder withLoadManagementPeons(Map<String, LoadQueuePeon> loadManagementPeonsCollection)
{
loadManagementPeons.putAll(Collections.unmodifiableMap(loadManagementPeonsCollection));
loadManagementPeons.putAll(loadManagementPeonsCollection);
return this;
}

View File

@ -81,13 +81,13 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ScheduledExecutorService processingExecutor;

View File

@ -34,7 +34,7 @@ final class ReservoirSegmentSampler
int numSoFar = 0;
for (ServerHolder server : serverHolders) {
for (DataSegment segment : server.getServer().getSegments()) {
for (DataSegment segment : server.getServer().getLazyAllSegments()) {
int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {

View File

@ -43,7 +43,7 @@ public class SegmentReplicantLookup
for (ServerHolder serverHolder : serversByType) {
ImmutableDruidServer server = serverHolder.getServer();
for (DataSegment segment : server.getSegments()) {
for (DataSegment segment : server.getLazyAllSegments()) {
Integer numReplicants = segmentsInCluster.get(segment.getId(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;

View File

@ -134,7 +134,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
int numSegments = 0;
for (ServerHolder sourceHolder : servers) {
numSegments += sourceHolder.getServer().getSegments().size();
numSegments += sourceHolder.getServer().getLazyAllSegments().size();
}
if (numSegments == 0) {
@ -191,7 +191,18 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
//noinspection ForLoopThatDoesntUseLoopVariable
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
if (segmentToMoveHolder == null) {
log.info("All servers to move segments from are empty, ending run.");
break;
}
// DruidCoordinatorRuntimeParams.getAvailableSegments originate from MetadataSegmentManager, i. e. that's a
// "desired" or "theoretical" set of segments. segmentToMoveHolder.getSegment originates from ServerInventoryView,
// i. e. that may be any segment that happens to be loaded on some server, even if it "shouldn't" from the
// "theoretical" point of view (Coordinator closes such discrepancies eventually via
// DruidCoordinatorCleanupUnneeded). Therefore the picked segmentToMoveHolder's segment may not need to be
// balanced.
boolean needToBalancePickedSegment = params.getAvailableSegments().contains(segmentToMoveHolder.getSegment());
if (needToBalancePickedSegment) {
final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
// we want to leave the server the segment is currently on in the list...

View File

@ -24,7 +24,6 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.ServerHolder;
@ -39,15 +38,6 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
{
private static final Logger log = new Logger(DruidCoordinatorCleanupUnneeded.class);
private final DruidCoordinator coordinator;
public DruidCoordinatorCleanupUnneeded(
DruidCoordinator coordinator
)
{
this.coordinator = coordinator;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
@ -55,13 +45,21 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
Set<DataSegment> availableSegments = params.getAvailableSegments();
DruidCluster cluster = params.getDruidCluster();
if (availableSegments.isEmpty()) {
log.info(
"Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " +
"a race condition in which the coordinator would drop all segments if it started running cleanup before " +
"it finished polling the metadata storage for available segments for the first time."
);
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
// Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might
// not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any
// segments at all, we should have all of them.)
// Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped.
// This is done to prevent a race condition in which the coordinator would drop all segments if it started running
// cleanup before it finished polling the metadata storage for available segments for the first time.
if (!availableSegments.isEmpty()) {
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();
@ -86,16 +84,7 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
}
}
}
} else {
log.info(
"Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent a race condition in which the coordinator would drop all segments if it started running cleanup before it finished polling the metadata storage for available segments for the first time."
);
}
return params.buildFromExisting()
.withCoordinatorStats(stats)
.build();
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
}

View File

@ -227,17 +227,17 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
}
);
coordinator.getReplicationStatus().forEach(
(final String tier, final Object2LongMap<String> status) -> {
for (final Object2LongMap.Entry<String> entry : status.object2LongEntrySet()) {
coordinator.computeUnderReplicationCountsPerDataSourcePerTier().forEach(
(final String tier, final Object2LongMap<String> underReplicationCountsPerDataSource) -> {
for (final Object2LongMap.Entry<String> entry : underReplicationCountsPerDataSource.object2LongEntrySet()) {
final String dataSource = entry.getKey();
final long count = entry.getLongValue();
final long underReplicationCount = entry.getLongValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
"segment/underReplicated/count", count
"segment/underReplicated/count", underReplicationCount
)
);
}

View File

@ -89,42 +89,15 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
// find available segments which are not overshadowed by other segments in DB
// only those would need to be loaded/dropped
// anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
for (DataSegment segment : params.getAvailableSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
timelines.put(segment.getDataSource(), timeline);
}
timeline.add(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)
);
}
Set<DataSegment> overshadowed = new HashSet<>();
for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
for (DataSegment dataSegment : holder.getObject().payloads()) {
overshadowed.add(dataSegment);
}
}
}
Set<DataSegment> nonOvershadowed = new HashSet<>();
for (DataSegment dataSegment : params.getAvailableSegments()) {
if (!overshadowed.contains(dataSegment)) {
nonOvershadowed.add(dataSegment);
}
}
Set<DataSegment> overshadowed = determineOvershadowedSegments(params);
for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier);
}
DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExistingWithoutAvailableSegments()
DruidCoordinatorRuntimeParams paramsWithReplicationManager = params
.buildFromExistingWithoutAvailableSegments()
.withReplicationManager(replicatorThrottler)
.withAvailableSegments(nonOvershadowed)
.build();
// Run through all matched rules for available segments
@ -133,7 +106,11 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0;
for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
for (DataSegment segment : params.getAvailableSegments()) {
if (overshadowed.contains(segment)) {
// Skipping overshadowed segments
continue;
}
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
@ -159,9 +136,26 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
.emit();
}
return paramsWithReplicationManager.buildFromExistingWithoutAvailableSegments()
.withCoordinatorStats(stats)
.withAvailableSegments(params.getAvailableSegments())
.build();
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
private Set<DataSegment> determineOvershadowedSegments(DruidCoordinatorRuntimeParams params)
{
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
for (DataSegment segment : params.getAvailableSegments()) {
timelines
.computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
Set<DataSegment> overshadowed = new HashSet<>();
for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
for (DataSegment dataSegment : holder.getObject().payloads()) {
overshadowed.add(dataSegment);
}
}
}
return overshadowed;
}
}

View File

@ -19,18 +19,19 @@
package org.apache.druid.server.coordinator.helper;
import org.apache.druid.java.util.common.logger.Logger;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.timeline.DataSegment;
import java.util.Set;
import java.util.TreeSet;
public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
{
private final DruidCoordinator coordinator;
private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorSegmentInfoLoader.class);
private static final Logger log = new Logger(DruidCoordinatorSegmentInfoLoader.class);
private final DruidCoordinator coordinator;
public DruidCoordinatorSegmentInfoLoader(DruidCoordinator coordinator)
{
@ -42,8 +43,31 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
{
log.info("Starting coordination. Getting available segments.");
// Display info about all available segments
final Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
// The following transform() call doesn't actually transform the iterable. It only checks the sizes of the segments
// and emits alerts if segments with negative sizes are encountered. In other words, semantically it's similar to
// Stream.peek(). It works as long as DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() (which is called
// below) guarantees to go over the passed iterable exactly once.
//
// An iterable returned from iterateAvailableDataSegments() is not simply iterated (with size checks) before passing
// into DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() because iterateAvailableDataSegments()'s
// documentation says to strive to avoid iterating the result more than once.
//
//noinspection StaticPseudoFunctionalStyleMethod: https://youtrack.jetbrains.com/issue/IDEA-153047
Iterable<DataSegment> availableSegmentsWithSizeChecking = Iterables.transform(
coordinator.iterateAvailableDataSegments(),
segment -> {
if (segment.getSize() < 0) {
log.makeAlert("No size on a segment")
.addData("segment", segment)
.emit();
}
return segment;
}
);
final TreeSet<DataSegment> availableSegments =
DruidCoordinatorRuntimeParams.createAvailableSegmentsSet(availableSegmentsWithSizeChecking);
// Log info about all available segments
if (log.isDebugEnabled()) {
log.debug("Available DataSegments");
for (DataSegment dataSegment : availableSegments) {
@ -54,7 +78,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
log.info("Found [%,d] available segments.", availableSegments.size());
return params.buildFromExisting()
.withAvailableSegments(availableSegments)
.setAvailableSegments(availableSegments)
.build();
}
}

View File

@ -43,7 +43,6 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = IntervalBroadcastDistributionRule.TYPE, value = IntervalBroadcastDistributionRule.class),
@JsonSubTypes.Type(name = PeriodBroadcastDistributionRule.TYPE, value = PeriodBroadcastDistributionRule.class)
})
public interface Rule
{
String getType();
@ -52,5 +51,16 @@ public interface Rule
boolean appliesTo(Interval interval, DateTime referenceTimestamp);
/**
* {@link DruidCoordinatorRuntimeParams#getAvailableSegments()} must not be called in Rule's code, because the
* available segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is
* because {@link DruidCoordinatorRuntimeParams} entangles two slightly different (nonexistent yet) abstractions:
* "DruidCoordinatorHelperParams" and "RuleParams" which contain params that only {@link
* org.apache.druid.server.coordinator.helper.DruidCoordinatorHelper}s and Rules need, respectively.
* For example, {@link org.apache.druid.server.coordinator.ReplicationThrottler} needs to belong only to "RuleParams",
* but not "DruidCoordinatorHelperParams". The opposite for "AvailableSegments".
*
* See https://github.com/apache/incubator-druid/issues/7228
*/
CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment);
}

View File

@ -95,7 +95,7 @@ public class CoordinatorResource
}
if (full != null) {
return Response.ok(coordinator.getReplicationStatus()).build();
return Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build();
}
return Response.ok(coordinator.getLoadStatus()).build();
}

View File

@ -80,8 +80,8 @@ public class ServersResource
* *and* values, but is done so for compatibility with the existing HTTP JSON API.
*
* Returns a lazy map suitable for serialization (i. e. entrySet iteration) only, relying on the fact that the
* segments returned from {@link DruidServer#getSegments()} are unique. This is not a part of the {@link DruidServer}
* API to not let abuse this map (like trying to get() from it).
* segments returned from {@link DruidServer#iterateAllSegments()} are unique. This is not a part of the {@link
* DruidServer} API to not let abuse this map (like trying to get() from it).
*/
private static Map<SegmentId, DataSegment> createLazySegmentsMap(DruidServer server)
{
@ -96,7 +96,7 @@ public class ServersResource
public Iterator<Entry<SegmentId, DataSegment>> iterator()
{
return Iterators.transform(
server.getSegments().iterator(),
server.iterateAllSegments().iterator(),
segment -> new AbstractMap.SimpleImmutableEntry<>(segment.getId(), segment)
);
}
@ -173,11 +173,11 @@ public class ServersResource
}
if (full != null) {
return builder.entity(Iterables.toString(server.getSegments())).build();
return builder.entity(Iterables.toString(server.iterateAllSegments())).build();
}
return builder
.entity(Iterables.toString(Iterables.transform(server.getSegments(), DataSegment::getId)))
.entity(Iterables.toString(Iterables.transform(server.iterateAllSegments(), DataSegment::getId)))
.build();
}

View File

@ -104,7 +104,7 @@ public class TiersResource
Map<String, Map<Interval, Map<IntervalProperties, Object>>> tierToStatsPerInterval = new HashMap<>();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
if (druidServer.getTier().equalsIgnoreCase(tierName)) {
for (DataSegment dataSegment : druidServer.getSegments()) {
for (DataSegment dataSegment : druidServer.iterateAllSegments()) {
Map<IntervalProperties, Object> properties = tierToStatsPerInterval
.computeIfAbsent(dataSegment.getDataSource(), dsName -> new HashMap<>())
.computeIfAbsent(dataSegment.getInterval(), interval -> new EnumMap<>(IntervalProperties.class));

View File

@ -259,7 +259,7 @@ public class HttpServerInventoryViewTest
DruidServer druidServer = httpServerInventoryView.getInventoryValue("host:8080");
Assert.assertEquals(
ImmutableMap.of(segment3.getId(), segment3, segment4.getId(), segment4),
Maps.uniqueIndex(druidServer.getSegments(), DataSegment::getId)
Maps.uniqueIndex(druidServer.iterateAllSegments(), DataSegment::getId)
);
druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode));

View File

@ -237,7 +237,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
@ -251,7 +251,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
segmentAnnouncer.unannounceSegment(segment1);
segmentAnnouncer.unannounceSegment(segment2);
@ -260,7 +260,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
}
@Test
@ -271,7 +271,7 @@ public class BatchServerInventoryViewTest
waitForSync(filteredBatchServerInventoryView, testSegments);
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
int prevUpdateCount = inventoryUpdateCounter.get();
@ -297,7 +297,7 @@ public class BatchServerInventoryViewTest
waitForSync(filteredBatchServerInventoryView, testSegments);
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
@ -393,7 +393,7 @@ public class BatchServerInventoryViewTest
final Timing forWaitingTiming = timing.forWaiting();
Stopwatch stopwatch = Stopwatch.createStarted();
while (Iterables.isEmpty(batchServerInventoryView.getInventory())
|| Iterables.size(Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments()) !=
|| Iterables.size(Iterables.get(batchServerInventoryView.getInventory(), 0).iterateAllSegments()) !=
testSegments.size()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
@ -430,7 +430,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
final Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
final Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
@ -498,7 +498,7 @@ public class BatchServerInventoryViewTest
Assert.assertEquals(INITIAL_SEGMENTS * 2, testSegments.size());
waitForSync(batchServerInventoryView, testSegments);
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
for (int i = 0; i < INITIAL_SEGMENTS; ++i) {
final DataSegment segment = makeSegment(100 + i);
@ -508,6 +508,6 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
}
}

View File

@ -94,7 +94,7 @@ public class CostBalancerStrategyTest
segments.put(segment.getId(), segment);
EasyMock.expect(druidServer.getSegment(segment.getId())).andReturn(segment).anyTimes();
}
EasyMock.expect(druidServer.getSegments()).andReturn(segments.values()).anyTimes();
EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments.values()).anyTimes();
EasyMock.replay(druidServer);
serverHolderList.add(new ServerHolder(druidServer, fromPeon));

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
@ -35,6 +36,7 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -324,10 +326,10 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
// for the destination and unannouncing from source server when noticing a drop request
sourceLoadQueueChildrenCache.getListenable().addListener(
(curatorFramework, pathChildrenCacheEvent) -> {
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
(CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
srcCountdown.countDown();
} else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
} else if (CuratorUtils.isChildAdded(event)) {
//Simulate source server dropping segment
unannounceSegmentFromBatchForServer(source, segmentToMove, sourceSegKeys.get(2), zkPathsConfig);
}
@ -335,10 +337,10 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
);
destinationLoadQueueChildrenCache.getListenable().addListener(
(curatorFramework, pathChildrenCacheEvent) -> {
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
(CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
destCountdown.countDown();
} else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
} else if (CuratorUtils.isChildAdded(event)) {
//Simulate destination server loading segment
announceBatchSegmentsForServer(dest, ImmutableSet.of(segmentToMove), zkPathsConfig, jsonMapper);
}

View File

@ -88,7 +88,7 @@ public class DiskNormalizedCostBalancerStrategyTest
segments.add(segment);
EasyMock.expect(druidServer.getSegment(segment.getId())).andReturn(segment).anyTimes();
}
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments).anyTimes();
EasyMock.replay(druidServer);
serverHolderList.add(new ServerHolder(druidServer, fromPeon));

View File

@ -116,9 +116,9 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
if (i == 0) {
EasyMock.expect(server.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(server.getLazyAllSegments()).andReturn(segments).anyTimes();
} else {
EasyMock.expect(server.getSegments()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(server.getLazyAllSegments()).andReturn(Collections.emptyList()).anyTimes();
}
EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(server);
@ -148,7 +148,7 @@ public class DruidCoordinatorBalancerProfiler
.withLoadManagementPeons(
peonMap
)
.withAvailableSegments(segments)
.withAvailableSegmentsInTest(segments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
@ -197,7 +197,7 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getLazyAllSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
@ -205,7 +205,7 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(druidServer2.getLazyAllSegments()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
@ -246,7 +246,7 @@ public class DruidCoordinatorBalancerProfiler
toPeon
)
)
.withAvailableSegments(segments)
.withAvailableSegmentsInTest(segments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE

View File

@ -535,7 +535,7 @@ public class DruidCoordinatorBalancerTest
.boxed()
.collect(Collectors.toMap(i -> String.valueOf(i + 1), peons::get))
)
.withAvailableSegments(segments)
.withAvailableSegmentsInTest(segments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
@ -558,7 +558,7 @@ public class DruidCoordinatorBalancerTest
EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes();
EasyMock.expect(druidServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes();
if (!segments.isEmpty()) {

View File

@ -198,7 +198,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@ -304,7 +304,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@ -403,7 +403,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@ -478,7 +478,7 @@ public class DruidCoordinatorRuleRunnerTest
new DruidCoordinatorRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@ -540,7 +540,7 @@ public class DruidCoordinatorRuleRunnerTest
new DruidCoordinatorRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
@ -609,7 +609,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@ -695,7 +695,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@ -786,7 +786,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@ -865,7 +865,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@ -963,7 +963,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@ -1047,7 +1047,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@ -1077,7 +1077,7 @@ public class DruidCoordinatorRuleRunnerTest
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withEmitter(emitter)
.withAvailableSegments(Collections.singletonList(overFlowSegment))
.withAvailableSegmentsInTest(Collections.singletonList(overFlowSegment))
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
@ -1180,7 +1180,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
@ -1286,7 +1286,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
.withAvailableSegments(longerAvailableSegments)
.withAvailableSegmentsInTest(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@ -1369,7 +1369,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withAvailableSegments(availableSegments)
.withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)

View File

@ -36,6 +36,7 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.SingleServerInventoryView;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -59,18 +60,15 @@ import org.apache.druid.timeline.SegmentId;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@ -82,13 +80,15 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class DruidCoordinatorTest extends CuratorTestBase
{
private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;
private DruidCoordinator coordinator;
private MetadataSegmentManager databaseSegmentManager;
private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer;
private DruidServer druidServer2;
private DataSegment segment;
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
private LoadQueuePeon loadQueuePeon;
private MetadataRuleManager metadataRuleManager;
@ -97,12 +97,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
private PathChildrenCache pathChildrenCache;
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
private JacksonConfigManager configManager;
private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;
@Before
public void setUp() throws Exception
@ -111,7 +107,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@ -228,7 +224,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void testMoveSegment()
{
segment = EasyMock.createNiceMock(DataSegment.class);
final DataSegment segment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(segment.getId()).andReturn(SegmentId.dummy("dummySegment"));
EasyMock.expect(segment.getDataSource()).andReturn("dummyDataSource");
EasyMock.replay(segment);
@ -269,7 +265,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
).atLeastOnce();
EasyMock.replay(druidServer);
druidServer2 = EasyMock.createMock(DruidServer.class);
DruidServer druidServer2 = EasyMock.createMock(DruidServer.class);
EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
@ -324,7 +320,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.replay(metadataRuleManager);
// Setup MetadataSegmentManager
DruidDataSource[] druidDataSources = {
DruidDataSource[] dataSources = {
new DruidDataSource(dataSource, Collections.emptyMap())
};
final DataSegment dataSegment = new DataSegment(
@ -338,13 +334,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
0x9,
0
);
druidDataSources[0].addSegment(dataSegment);
dataSources[0].addSegment(dataSegment);
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
).atLeastOnce();
EasyMock.replay(databaseSegmentManager);
setupMetadataSegmentManagerMock(dataSources[0]);
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(immutableDruidDataSource.getSegments())
.andReturn(ImmutableSet.of(dataSegment)).atLeastOnce();
@ -372,9 +364,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
{
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
if (CuratorUtils.isChildAdded(event)) {
if (assignSegmentLatch.getCount() > 0) {
//Coordinator should try to assign segment to druidServer historical
//Simulate historical loading segment
@ -402,17 +394,19 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertEquals(1, segmentAvailability.size());
Assert.assertEquals(0L, segmentAvailability.get(dataSource));
Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertNotNull(replicationStatus);
Assert.assertEquals(1, replicationStatus.entrySet().size());
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
Object2LongMap<String> dataSourceMap = replicationStatus.get(tier);
Assert.assertNotNull(dataSourceMap);
Assert.assertEquals(1, dataSourceMap.size());
Assert.assertNotNull(dataSourceMap.get(dataSource));
Object2LongMap<String> underRepliicationCountsPerDataSource = underReplicationCountsPerDataSourcePerTier.get(tier);
Assert.assertNotNull(underRepliicationCountsPerDataSource);
Assert.assertEquals(1, underRepliicationCountsPerDataSource.size());
//noinspection deprecation
Assert.assertNotNull(underRepliicationCountsPerDataSource.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, therefore 1 replica should still be pending
Assert.assertEquals(1L, dataSourceMap.getLong(dataSource));
Assert.assertEquals(1L, underRepliicationCountsPerDataSource.getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
@ -467,18 +461,17 @@ public class DruidCoordinatorTest extends CuratorTestBase
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);
setupMetadataSegmentManagerMock(druidDataSources[0]);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(metadataRuleManager, databaseSegmentManager, serverInventoryView);
EasyMock.replay(metadataRuleManager, serverInventoryView);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
@ -486,15 +479,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2);
pathChildrenCache.getListenable().addListener(
(client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
DataSegment segment = dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
if (segment != null) {
hotServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
@ -507,15 +493,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1);
pathChildrenCacheCold.getListenable().addListener(
(client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
DataSegment segment = dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);
(CuratorFramework client, PathChildrenCacheEvent event) -> {
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
if (segment != null) {
coldServer.addDataSegment(segment);
@ -536,10 +516,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertEquals(2, replicationStatus.entrySet().size());
Assert.assertEquals(0L, replicationStatus.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, replicationStatus.get(coldTierName).getLong(dataSource));
Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
@ -549,52 +530,37 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager);
}
@Test
public void testOrderedAvailableDataSegments()
private void setupMetadataSegmentManagerMock(DruidDataSource dataSource)
{
DruidDataSource dataSource = new DruidDataSource("test", new HashMap());
DataSegment[] segments = new DataSegment[]{
getSegment("test", Intervals.of("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", Intervals.of("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")),
getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z"))
};
for (DataSegment segment : segments) {
dataSource.addSegment(segment);
}
EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
ImmutableList.of(dataSource.toImmutableDruidDataSource())
).atLeastOnce();
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock
.expect(databaseSegmentManager.iterateAllSegments())
.andReturn(dataSource.getSegments())
.anyTimes();
EasyMock
.expect(databaseSegmentManager.getDataSources())
.andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
.anyTimes();
EasyMock
.expect(databaseSegmentManager.getAllDataSourceNames())
.andReturn(Collections.singleton(dataSource.getName()))
.anyTimes();
EasyMock.replay(databaseSegmentManager);
Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
DataSegment[] expected = new DataSegment[]{
getSegment("test", Intervals.of("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
getSegment("test", Intervals.of("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")),
getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z"))
};
Assert.assertEquals(expected.length, availableSegments.size());
Assert.assertEquals(expected, availableSegments.toArray());
EasyMock.verify(databaseSegmentManager);
}
private DataSegment getSegment(String dataSource, Interval interval)
@Nullable
private static DataSegment findSegmentRelatedToCuratorEvent(
Map<String, DataSegment> dataSegments,
PathChildrenCacheEvent event
)
{
// Not using EasyMock as it hampers the performance of multithreads.
DataSegment segment = new DataSegment(
dataSource,
interval,
"dummy_version",
new ConcurrentHashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
null,
0,
0L
);
return segment;
return dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);
}
private static class TestDruidLeaderSelector implements DruidLeaderSelector

View File

@ -138,7 +138,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer1.getSegments()).andReturn(segments1).anyTimes();
EasyMock.expect(druidServer1.getLazyAllSegments()).andReturn(segments1).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
@ -146,7 +146,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer2.getSegments()).andReturn(segments2).anyTimes();
EasyMock.expect(druidServer2.getLazyAllSegments()).andReturn(segments2).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
@ -154,7 +154,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer3.getSegments()).andReturn(segments3).anyTimes();
EasyMock.expect(druidServer3.getLazyAllSegments()).andReturn(segments3).anyTimes();
EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
@ -162,7 +162,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
EasyMock.expect(druidServer4.getSegments()).andReturn(segments4).anyTimes();
EasyMock.expect(druidServer4.getLazyAllSegments()).andReturn(segments4).anyTimes();
EasyMock.expect(druidServer4.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer4);

View File

@ -115,7 +115,7 @@ public class CachingCostBalancerStrategyTest
{
ClusterCostCache.Builder builder = ClusterCostCache.builder();
serverHolders.forEach(
s -> s.getServer().getSegments().forEach(segment -> builder.addSegment(s.getServer().getName(), segment))
s -> s.getServer().getLazyAllSegments().forEach(segment -> builder.addSegment(s.getServer().getName(), segment))
);
return new CachingCostBalancerStrategy(builder.build(), listeningExecutorService);
}

View File

@ -117,8 +117,9 @@ public class DruidCoordinatorCleanupOvershadowedTest
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
));
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder()
.withAvailableSegments(availableSegments)
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder()
.withAvailableSegmentsInTest(availableSegments)
.withCoordinatorStats(new CoordinatorStats())
.withDruidCluster(druidCluster)
.build();

View File

@ -286,7 +286,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(
.withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
@ -337,7 +337,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(secondCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(secondCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(
.withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1)
@ -366,7 +366,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(
.withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
@ -404,7 +404,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(
.withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1),

View File

@ -182,7 +182,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -253,7 +253,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -303,7 +303,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -393,7 +393,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -482,7 +482,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -541,7 +541,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -614,7 +614,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -671,7 +671,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(dataSegment1, dataSegment2, dataSegment3)
.withAvailableSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsInNodeLoadingQueue(2).build())
.build();
@ -728,7 +728,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -785,7 +785,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment).build(),
.withAvailableSegmentsInTest(segment).build(),
segment
);
@ -838,7 +838,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment1, segment2)
.withAvailableSegmentsInTest(segment1, segment2)
.build();
CoordinatorStats stats = rule.run(
null,
@ -904,7 +904,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withAvailableSegments(segment1)
.withAvailableSegmentsInTest(segment1)
.build();
CoordinatorStats stats = rule.run(
null,

View File

@ -479,7 +479,7 @@ public class SystemSchema extends AbstractSchema
final List<ImmutableDruidServer> druidServers = serverView.getDruidServers();
final int serverSegmentsTableSize = SERVER_SEGMENTS_SIGNATURE.getRowOrder().size();
for (ImmutableDruidServer druidServer : druidServers) {
for (DataSegment segment : druidServer.getSegments()) {
for (DataSegment segment : druidServer.getLazyAllSegments()) {
Object[] row = new Object[serverSegmentsTableSize];
row[0] = druidServer.getHost();
row[1] = segment.getId();

View File

@ -268,17 +268,13 @@ public class DruidSchemaTest extends CalciteTestBase
SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build();
schema.setSegmentMetadataHolder(existingSegment, updatedHolder);
// find a druidServer holding existingSegment
final Pair<ImmutableDruidServer, DataSegment> pair = druidServers.stream()
.flatMap(druidServer -> druidServer.getSegments()
final Pair<ImmutableDruidServer, DataSegment> pair = druidServers
.stream()
.filter(segment -> segment
.equals(
existingSegment))
.map(segment -> Pair
.of(
druidServer,
segment
)))
.flatMap(druidServer -> druidServer
.getLazyAllSegments().stream()
.filter(segment -> segment.equals(existingSegment))
.map(segment -> Pair.of(druidServer, segment))
)
.findAny()
.orElse(null);
Assert.assertNotNull(pair);