diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index b2dcda3cd58..77078ae296f 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -269,6 +269,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -278,6 +302,10 @@
+
+
+
diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index a25d73e4c13..2850c50200f 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -88,9 +88,9 @@ public class ScheduledExecutors
public void run()
{
try {
- log.debug("Running %s (delay %s)", callable, delay);
+ log.trace("Running %s (delay %s)", callable, delay);
if (callable.call() == Signal.REPEAT) {
- log.debug("Rescheduling %s (delay %s)", callable, delay);
+ log.trace("Rescheduling %s (delay %s)", callable, delay);
exec.schedule(this, delay.getMillis(), TimeUnit.MILLISECONDS);
} else {
log.debug("Stopped rescheduling %s (delay %s)", callable, delay);
@@ -154,7 +154,7 @@ public class ScheduledExecutors
}
try {
- log.debug("Running %s (period %s)", callable, rate);
+ log.trace("Running %s (period %s)", callable, rate);
prevSignal = callable.call();
}
catch (Throwable e) {
diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
new file mode 100644
index 00000000000..6b4d3cc3df4
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+public final class CollectionUtils
+{
+ /**
+ * Returns a lazy collection from a stream supplier and a size. {@link Collection#iterator()} of the returned
+ * collection delegates to {@link Stream#iterator()} on the stream returned from the supplier.
+ */
+ public static Collection createLazyCollectionFromStream(Supplier> sequentialStreamSupplier, int size)
+ {
+ return new AbstractCollection()
+ {
+ @Override
+ public Iterator iterator()
+ {
+ return sequentialStreamSupplier.get().iterator();
+ }
+
+ @Override
+ public Spliterator spliterator()
+ {
+ return sequentialStreamSupplier.get().spliterator();
+ }
+
+ @Override
+ public Stream stream()
+ {
+ return sequentialStreamSupplier.get();
+ }
+
+ @Override
+ public Stream parallelStream()
+ {
+ return sequentialStreamSupplier.get().parallel();
+ }
+
+ @Override
+ public int size()
+ {
+ return size;
+ }
+ };
+ }
+
+ private CollectionUtils() {}
+}
diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java
index 4468f63d14c..bdb3bf17195 100644
--- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java
+++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java
@@ -20,8 +20,6 @@
package org.apache.druid.emitter.ambari.metrics;
import com.fasterxml.jackson.databind.Module;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
@@ -35,6 +33,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
public class AmbariMetricsEmitterModule implements DruidModule
{
@@ -57,17 +56,11 @@ public class AmbariMetricsEmitterModule implements DruidModule
@Named(EMITTER_TYPE)
public Emitter getEmitter(AmbariMetricsEmitterConfig emitterConfig, final Injector injector)
{
- List emitters = Lists.transform(
- emitterConfig.getAlertEmitters(),
- new Function()
- {
- @Override
- public Emitter apply(String s)
- {
- return injector.getInstance(Key.get(Emitter.class, Names.named(s)));
- }
- }
- );
+ List emitters = emitterConfig
+ .getAlertEmitters()
+ .stream()
+ .map((String name) -> injector.getInstance(Key.get(Emitter.class, Names.named(name))))
+ .collect(Collectors.toList());
return new AmbariMetricsEmitter(emitterConfig, emitters);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
index 9aa4e5f1e15..d577233521d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -156,12 +157,12 @@ public class WorkerTaskMonitor extends WorkerTaskManager
new PathChildrenCacheListener()
{
@Override
- public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
+ public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
throws Exception
{
- if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+ if (CuratorUtils.isChildAdded(event)) {
final Task task = jsonMapper.readValue(
- cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
+ cf.getData().forPath(event.getData().getPath()),
Task.class
);
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 4d0616cdbb4..7df034a0b6e 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -219,7 +219,7 @@ public class BrokerServerView implements TimelineServerView
private QueryableDruidServer removeServer(DruidServer server)
{
- for (DataSegment segment : server.getSegments()) {
+ for (DataSegment segment : server.iterateAllSegments()) {
serverRemovedSegment(server.getMetadata(), segment);
}
return clients.remove(server.getName());
diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
index 91ebe381dc4..1f1d801d9e3 100644
--- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
+++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
@@ -123,7 +123,7 @@ public class CoordinatorServerView implements InventoryView
private void removeServer(DruidServer server)
{
- for (DataSegment segment : server.getSegments()) {
+ for (DataSegment segment : server.iterateAllSegments()) {
serverRemovedSegment(server.getMetadata(), segment);
}
}
diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java
index ca825f62e49..ac73d63febf 100644
--- a/server/src/main/java/org/apache/druid/client/DruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/DruidServer.java
@@ -153,15 +153,27 @@ public class DruidServer implements Comparable
return metadata.getHostAndTlsPort() != null ? "https" : "http";
}
- public Iterable getSegments()
+ /**
+ * Returns an iterable to go over all segments in all data sources, stored on this DruidServer. The order in which
+ * segments are iterated is unspecified.
+ *
+ * Since this DruidServer can be mutated concurrently, the set of segments observed during an iteration may _not_ be
+ * a momentary snapshot of the segments on the server, in other words, it may be that there was no moment when the
+ * DruidServer stored exactly the returned set of segments.
+ *
+ * Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try (to some
+ * reasonable extent) to organize the code so that it iterates the returned iterable only once rather than several
+ * times.
+ */
+ public Iterable iterateAllSegments()
{
return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
}
/**
* Returns the current number of segments, stored in this DruidServer object. This number if weakly consistent with
- * the number of segments if {@link #getSegments} is iterated about the same time, because segments might be added or
- * removed in parallel.
+ * the number of segments if {@link #iterateAllSegments} is iterated about the same time, because segments might be
+ * added or removed in parallel.
*/
public int getTotalSegments()
{
@@ -200,7 +212,7 @@ public class DruidServer implements Comparable
public DruidServer addDataSegments(DruidServer server)
{
- server.getSegments().forEach(this::addDataSegment);
+ server.iterateAllSegments().forEach(this::addDataSegment);
return this;
}
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 1279b3af41c..05154143b21 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -70,16 +70,17 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * This class uses internal-discovery i.e. {@link DruidNodeDiscoveryProvider} to discover various queryable nodes in the cluster
- * such as historicals and realtime peon processes.
- * For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs in SegmentListerResource.getSegments(..),
- * to keep sync'd state of segments served by those servers.
+ * This class uses internal-discovery i.e. {@link DruidNodeDiscoveryProvider} to discover various queryable nodes in the
+ * cluster such as historicals and realtime peon processes.
+ *
+ * For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs for {@link
+ * org.apache.druid.server.http.SegmentListerResource#getSegments}), to keep sync'd state of segments served by those
+ * servers.
*/
public class HttpServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
{
- public static final TypeReference> SEGMENT_LIST_RESP_TYPE_REF = new TypeReference>()
- {
- };
+ public static final TypeReference> SEGMENT_LIST_RESP_TYPE_REF =
+ new TypeReference>() {};
private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class);
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
@@ -545,7 +546,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
public void fullSync(List changes)
{
Map toRemove = Maps.newHashMapWithExpectedSize(druidServer.getTotalSegments());
- druidServer.getSegments().forEach(segment -> toRemove.put(segment.getId(), segment));
+ druidServer.iterateAllSegments().forEach(segment -> toRemove.put(segment.getId(), segment));
for (DataSegmentChangeRequest request : changes) {
if (request instanceof SegmentChangeRequestLoad) {
diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
index b6d893ff52a..d01ad961a70 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
@@ -26,10 +26,10 @@ import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CollectionUtils;
-import java.util.AbstractCollection;
+import javax.annotation.Nullable;
import java.util.Collection;
-import java.util.Iterator;
/**
* This class should not be subclassed, it isn't declared final only to make it possible to mock the class with EasyMock
@@ -107,6 +107,7 @@ public class ImmutableDruidServer
return metadata.getPriority();
}
+ @Nullable
public DataSegment getSegment(SegmentId segmentId)
{
ImmutableDruidDataSource dataSource = dataSources.get(segmentId.getDataSource());
@@ -126,22 +127,22 @@ public class ImmutableDruidServer
return dataSources.get(name);
}
- public Collection getSegments()
+ /**
+ * Returns a lazy collection with all segments in all data sources, stored on this ImmutableDruidServer. The order
+ * of segments in this collection is unspecified.
+ *
+ * Calling {@link Collection#size()} on the returned collection is cheap, O(1).
+ *
+ * Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an
+ * ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only
+ * once rather than several times.
+ */
+ public Collection getLazyAllSegments()
{
- return new AbstractCollection()
- {
- @Override
- public Iterator iterator()
- {
- return dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
- }
-
- @Override
- public int size()
- {
- return totalSegments;
- }
- };
+ return CollectionUtils.createLazyCollectionFromStream(
+ () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()),
+ totalSegments
+ );
}
public String getURL()
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorUtils.java b/server/src/main/java/org/apache/druid/curator/CuratorUtils.java
index b89a645b9b2..91e26af476e 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorUtils.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorUtils.java
@@ -20,6 +20,7 @@
package org.apache.druid.curator;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.zookeeper.CreateMode;
@@ -127,4 +128,9 @@ public class CuratorUtils
);
}
}
+
+ public static boolean isChildAdded(PathChildrenCacheEvent event)
+ {
+ return event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED);
+ }
}
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index e33e3543945..436ad125bf7 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -21,6 +21,7 @@ package org.apache.druid.metadata;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
@@ -59,6 +60,14 @@ public interface MetadataSegmentManager
Collection getDataSources();
+ /**
+ * Returns an iterable to go over all segments in all data sources. The order in which segments are iterated is
+ * unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
+ * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
+ * several times.
+ */
+ Iterable iterateAllSegments();
+
Collection getAllDataSourceNames();
/**
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index ec73dc5c5e8..dff9c9df61c 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -435,6 +435,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
.collect(Collectors.toList());
}
+ @Override
+ public Iterable iterateAllSegments()
+ {
+ return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
+ }
+
@Override
public Collection getAllDataSourceNames()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
index a75642f9baf..552227eaf3c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
@@ -55,8 +55,10 @@ public interface BalancerStrategy
/**
* Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
* @param serverHolders set of historicals to consider for moving segments
- * @return {@link BalancerSegmentHolder} containing segment to move and server it current resides on
+ * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
+ * there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
+ @Nullable
BalancerSegmentHolder pickSegmentToMove(List serverHolders);
/**
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 5bf4f381bac..d2d3029b17b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -258,7 +258,7 @@ public class CostBalancerStrategy implements BalancerStrategy
{
double cost = 0;
for (ServerHolder server : serverHolders) {
- Iterable segments = server.getServer().getSegments();
+ Iterable segments = server.getServer().getLazyAllSegments();
for (DataSegment s : segments) {
cost += computeJointSegmentsCost(s, segments);
}
@@ -280,7 +280,7 @@ public class CostBalancerStrategy implements BalancerStrategy
{
double cost = 0;
for (ServerHolder server : serverHolders) {
- for (DataSegment segment : server.getServer().getSegments()) {
+ for (DataSegment segment : server.getServer().getLazyAllSegments()) {
cost += computeJointSegmentsCost(segment, segment);
}
}
@@ -334,7 +334,7 @@ public class CostBalancerStrategy implements BalancerStrategy
// the sum of the costs of other (exclusive of the proposalSegment) segments on the server
cost += computeJointSegmentsCost(
proposalSegment,
- Iterables.filter(server.getServer().getSegments(), segment -> !proposalSegment.equals(segment))
+ Iterables.filter(server.getServer().getLazyAllSegments(), segment -> !proposalSegment.equals(segment))
);
// plus the costs of segments that will be loaded
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index 2c9a334c108..a6b10bc345c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -79,13 +79,13 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>(
- DruidCoordinator.SEGMENT_COMPARATOR
+ DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>(
- DruidCoordinator.SEGMENT_COMPARATOR
+ DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListSet segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
- DruidCoordinator.SEGMENT_COMPARATOR
+ DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final Object lock = new Object();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
index 2e065be24a1..dff28710a00 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
@@ -47,8 +47,8 @@ public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
}
int nSegments = 1;
- if (server.getServer().getSegments().size() > 0) {
- nSegments = server.getServer().getSegments().size();
+ if (server.getServer().getLazyAllSegments().size() > 0) {
+ nSegments = server.getServer().getLazyAllSegments().size();
}
double normalizedCost = cost / nSegments;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 98c97a04272..3b5faef7a04 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -81,7 +81,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -93,10 +92,29 @@ import java.util.stream.Collectors;
@ManageLifecycle
public class DruidCoordinator
{
- public static Comparator SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
- .onResultOf(DataSegment::getInterval)
- .compound(Ordering.natural())
- .reverse();
+ /**
+ * This comparator orders "freshest" segments first, i. e. segments with most recent intervals.
+ *
+ * It is used in historical nodes' {@link LoadQueuePeon}s to make historicals load more recent segment first.
+ *
+ * It is also used in {@link DruidCoordinatorRuntimeParams} for {@link
+ * DruidCoordinatorRuntimeParams#getAvailableSegments()} - a collection of segments to be considered during some
+ * coordinator run for different {@link DruidCoordinatorHelper}s. The order matters only for {@link
+ * DruidCoordinatorRuleRunner}, which tries to apply the rules while iterating the segments in the order imposed by
+ * this comparator. In {@link LoadRule} the throttling limit may be hit (via {@link ReplicationThrottler}; see
+ * {@link CoordinatorDynamicConfig#getReplicationThrottleLimit()}). So before we potentially hit this limit, we want
+ * to schedule loading the more recent segments (among all of those that need to be loaded).
+ *
+ * In both {@link LoadQueuePeon}s and {@link DruidCoordinatorRuleRunner}, we want to load more recent segments first
+ * because presumably they are queried more often and contain are more important data for users, so if the Druid
+ * cluster has availability problems and struggling to make all segments available immediately, at least we try to
+ * make more "important" (more recent) segments available as soon as possible.
+ */
+ static final Comparator SEGMENT_COMPARATOR_RECENT_FIRST = Ordering
+ .from(Comparators.intervalsByEndThenStart())
+ .onResultOf(DataSegment::getInterval)
+ .compound(Ordering.natural())
+ .reverse();
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
@@ -224,17 +242,18 @@ public class DruidCoordinator
return loadManagementPeons;
}
- public Map> getReplicationStatus()
+ /** @return tier -> { dataSource -> underReplicationCount } map */
+ public Map> computeUnderReplicationCountsPerDataSourcePerTier()
{
- final Map> retVal = new HashMap<>();
+ final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
if (segmentReplicantLookup == null) {
- return retVal;
+ return underReplicationCountsPerDataSourcePerTier;
}
final DateTime now = DateTimes.nowUtc();
- for (final DataSegment segment : getAvailableDataSegments()) {
+ for (final DataSegment segment : iterateAvailableDataSegments()) {
final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (final Rule rule : rules) {
@@ -246,15 +265,16 @@ public class DruidCoordinator
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
- retVal
- .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
+ Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
+ .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
+ ((Object2LongOpenHashMap) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
break; // only the first matching rule applies
}
}
- return retVal;
+ return underReplicationCountsPerDataSourcePerTier;
}
public Object2LongMap getSegmentAvailability()
@@ -265,7 +285,7 @@ public class DruidCoordinator
return retVal;
}
- for (DataSegment segment : getAvailableDataSegments()) {
+ for (DataSegment segment : iterateAvailableDataSegments()) {
if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) {
retVal.addTo(segment.getDataSource(), 1);
} else {
@@ -428,30 +448,15 @@ public class DruidCoordinator
}
}
- public Set getOrderedAvailableDataSegments()
+ /**
+ * Returns an iterable to go over all available segments in all data sources. The order in which segments are iterated
+ * is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
+ * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
+ * several times.
+ */
+ public Iterable iterateAvailableDataSegments()
{
- Set availableSegments = new TreeSet<>(SEGMENT_COMPARATOR);
-
- Iterable dataSegments = getAvailableDataSegments();
-
- for (DataSegment dataSegment : dataSegments) {
- if (dataSegment.getSize() < 0) {
- log.makeAlert("No size on Segment, wtf?")
- .addData("segment", dataSegment)
- .emit();
- }
- availableSegments.add(dataSegment);
- }
-
- return availableSegments;
- }
-
- private List getAvailableDataSegments()
- {
- return metadataSegmentManager.getDataSources()
- .stream()
- .flatMap(source -> source.getSegments().stream())
- .collect(Collectors.toList());
+ return metadataSegmentManager.iterateAllSegments();
}
@LifecycleStart
@@ -735,7 +740,7 @@ public class DruidCoordinator
.build();
},
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
- new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
+ new DruidCoordinatorCleanupUnneeded(),
new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
new DruidCoordinatorBalancer(DruidCoordinator.this),
new DruidCoordinatorLogger(DruidCoordinator.this)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 08390385045..655d6bdf22f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -28,24 +29,35 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.DateTime;
+import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import java.util.TreeSet;
/**
*/
public class DruidCoordinatorRuntimeParams
{
+ /**
+ * Creates a TreeSet sorted in {@link DruidCoordinator#SEGMENT_COMPARATOR_RECENT_FIRST} order and populates it with
+ * the segments from the given iterable. The given iterable is iterated exactly once. No special action is taken if
+ * duplicate segments are encountered in the iterable.
+ */
+ public static TreeSet createAvailableSegmentsSet(Iterable availableSegments)
+ {
+ TreeSet segmentsSet = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
+ availableSegments.forEach(segmentsSet::add);
+ return segmentsSet;
+ }
+
private final long startTime;
private final DruidCluster druidCluster;
private final MetadataRuleManager databaseRuleManager;
private final SegmentReplicantLookup segmentReplicantLookup;
private final Map> dataSources;
- private final Set availableSegments;
+ private final @Nullable TreeSet availableSegments;
private final Map loadManagementPeons;
private final ReplicationThrottler replicationManager;
private final ServiceEmitter emitter;
@@ -61,7 +73,7 @@ public class DruidCoordinatorRuntimeParams
MetadataRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Map> dataSources,
- Set availableSegments,
+ @Nullable TreeSet availableSegments,
Map loadManagementPeons,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
@@ -113,8 +125,9 @@ public class DruidCoordinatorRuntimeParams
return dataSources;
}
- public Set getAvailableSegments()
+ public TreeSet getAvailableSegments()
{
+ Preconditions.checkState(availableSegments != null, "availableSegments must be set");
return availableSegments;
}
@@ -196,7 +209,7 @@ public class DruidCoordinatorRuntimeParams
databaseRuleManager,
segmentReplicantLookup,
dataSources,
- new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR),
+ null, // availableSegments
loadManagementPeons,
replicationManager,
emitter,
@@ -215,7 +228,7 @@ public class DruidCoordinatorRuntimeParams
private MetadataRuleManager databaseRuleManager;
private SegmentReplicantLookup segmentReplicantLookup;
private Map> dataSources;
- private final Set availableSegments;
+ private @Nullable TreeSet availableSegments;
private final Map loadManagementPeons;
private ReplicationThrottler replicationManager;
private ServiceEmitter emitter;
@@ -232,7 +245,7 @@ public class DruidCoordinatorRuntimeParams
this.databaseRuleManager = null;
this.segmentReplicantLookup = null;
this.dataSources = new HashMap<>();
- this.availableSegments = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR);
+ this.availableSegments = null;
this.loadManagementPeons = new HashMap<>();
this.replicationManager = null;
this.emitter = null;
@@ -248,7 +261,7 @@ public class DruidCoordinatorRuntimeParams
MetadataRuleManager databaseRuleManager,
SegmentReplicantLookup segmentReplicantLookup,
Map> dataSources,
- Set availableSegments,
+ @Nullable TreeSet availableSegments,
Map loadManagementPeons,
ReplicationThrottler replicationManager,
ServiceEmitter emitter,
@@ -346,22 +359,37 @@ public class DruidCoordinatorRuntimeParams
return this;
}
+ /** This method must be used in test code only. */
@VisibleForTesting
- public Builder withAvailableSegments(DataSegment... availableSegments)
+ public Builder withAvailableSegmentsInTest(DataSegment... availableSegments)
{
- this.availableSegments.addAll(Arrays.asList(availableSegments));
- return this;
+ return withAvailableSegmentsInTest(Arrays.asList(availableSegments));
}
- public Builder withAvailableSegments(Collection availableSegments)
+ /** This method must be used in test code only. */
+ @VisibleForTesting
+ public Builder withAvailableSegmentsInTest(Collection availableSegments)
{
- this.availableSegments.addAll(Collections.unmodifiableCollection(availableSegments));
+ return setAvailableSegments(createAvailableSegmentsSet(availableSegments));
+ }
+
+ /**
+ * Note: unlike {@link #withAvailableSegmentsInTest(Collection)}, this method doesn't make a defensive copy of the
+ * provided set. The set passed into this method must not be modified afterwards.
+ */
+ public Builder setAvailableSegments(TreeSet availableSegments)
+ {
+ //noinspection ObjectEquality
+ if (availableSegments.comparator() != DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST) {
+ throw new IllegalArgumentException("Expected DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST");
+ }
+ this.availableSegments = availableSegments;
return this;
}
public Builder withLoadManagementPeons(Map loadManagementPeonsCollection)
{
- loadManagementPeons.putAll(Collections.unmodifiableMap(loadManagementPeonsCollection));
+ loadManagementPeons.putAll(loadManagementPeonsCollection);
return this;
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
index 703f52edfda..cbb0c265f73 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -81,13 +81,13 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>(
- DruidCoordinator.SEGMENT_COMPARATOR
+ DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>(
- DruidCoordinator.SEGMENT_COMPARATOR
+ DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ConcurrentSkipListSet segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
- DruidCoordinator.SEGMENT_COMPARATOR
+ DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);
private final ScheduledExecutorService processingExecutor;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
index 0402c9e2345..e770ef78210 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
@@ -34,7 +34,7 @@ final class ReservoirSegmentSampler
int numSoFar = 0;
for (ServerHolder server : serverHolders) {
- for (DataSegment segment : server.getServer().getSegments()) {
+ for (DataSegment segment : server.getServer().getLazyAllSegments()) {
int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
index 33e6309d815..ce08bfe2773 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
@@ -43,7 +43,7 @@ public class SegmentReplicantLookup
for (ServerHolder serverHolder : serversByType) {
ImmutableDruidServer server = serverHolder.getServer();
- for (DataSegment segment : server.getSegments()) {
+ for (DataSegment segment : server.getLazyAllSegments()) {
Integer numReplicants = segmentsInCluster.get(segment.getId(), server.getTier());
if (numReplicants == null) {
numReplicants = 0;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index cf8d7253191..b4b900b58a3 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -134,7 +134,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
int numSegments = 0;
for (ServerHolder sourceHolder : servers) {
- numSegments += sourceHolder.getServer().getSegments().size();
+ numSegments += sourceHolder.getServer().getLazyAllSegments().size();
}
if (numSegments == 0) {
@@ -191,7 +191,18 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
//noinspection ForLoopThatDoesntUseLoopVariable
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
- if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
+ if (segmentToMoveHolder == null) {
+ log.info("All servers to move segments from are empty, ending run.");
+ break;
+ }
+ // DruidCoordinatorRuntimeParams.getAvailableSegments originate from MetadataSegmentManager, i. e. that's a
+ // "desired" or "theoretical" set of segments. segmentToMoveHolder.getSegment originates from ServerInventoryView,
+ // i. e. that may be any segment that happens to be loaded on some server, even if it "shouldn't" from the
+ // "theoretical" point of view (Coordinator closes such discrepancies eventually via
+ // DruidCoordinatorCleanupUnneeded). Therefore the picked segmentToMoveHolder's segment may not need to be
+ // balanced.
+ boolean needToBalancePickedSegment = params.getAvailableSegments().contains(segmentToMoveHolder.getSegment());
+ if (needToBalancePickedSegment) {
final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
// we want to leave the server the segment is currently on in the list...
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
index 7c41f990b00..a7a1bcce220 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
@@ -24,7 +24,6 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
-import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.ServerHolder;
@@ -39,15 +38,6 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
{
private static final Logger log = new Logger(DruidCoordinatorCleanupUnneeded.class);
- private final DruidCoordinator coordinator;
-
- public DruidCoordinatorCleanupUnneeded(
- DruidCoordinator coordinator
- )
- {
- this.coordinator = coordinator;
- }
-
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
@@ -55,47 +45,46 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
Set availableSegments = params.getAvailableSegments();
DruidCluster cluster = params.getDruidCluster();
+ if (availableSegments.isEmpty()) {
+ log.info(
+ "Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " +
+ "a race condition in which the coordinator would drop all segments if it started running cleanup before " +
+ "it finished polling the metadata storage for available segments for the first time."
+ );
+ return params.buildFromExisting().withCoordinatorStats(stats).build();
+ }
+
// Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might
// not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any
// segments at all, we should have all of them.)
// Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped.
// This is done to prevent a race condition in which the coordinator would drop all segments if it started running
// cleanup before it finished polling the metadata storage for available segments for the first time.
- if (!availableSegments.isEmpty()) {
- for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) {
- for (ServerHolder serverHolder : serverHolders) {
- ImmutableDruidServer server = serverHolder.getServer();
+ for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) {
+ for (ServerHolder serverHolder : serverHolders) {
+ ImmutableDruidServer server = serverHolder.getServer();
- for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
- for (DataSegment segment : dataSource.getSegments()) {
- if (!availableSegments.contains(segment)) {
- LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
+ for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
+ for (DataSegment segment : dataSource.getSegments()) {
+ if (!availableSegments.contains(segment)) {
+ LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
- if (!queuePeon.getSegmentsToDrop().contains(segment)) {
- queuePeon.dropSegment(segment, () -> {});
- stats.addToTieredStat("unneededCount", server.getTier(), 1);
- log.info(
- "Dropping uneeded segment [%s] from server [%s] in tier [%s]",
- segment.getId(),
- server.getName(),
- server.getTier()
- );
- }
+ if (!queuePeon.getSegmentsToDrop().contains(segment)) {
+ queuePeon.dropSegment(segment, () -> {});
+ stats.addToTieredStat("unneededCount", server.getTier(), 1);
+ log.info(
+ "Dropping uneeded segment [%s] from server [%s] in tier [%s]",
+ segment.getId(),
+ server.getName(),
+ server.getTier()
+ );
}
}
}
}
}
- } else {
- log.info(
- "Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent a race condition in which the coordinator would drop all segments if it started running cleanup before it finished polling the metadata storage for available segments for the first time."
- );
}
- return params.buildFromExisting()
- .withCoordinatorStats(stats)
- .build();
+ return params.buildFromExisting().withCoordinatorStats(stats).build();
}
-
-
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java
index 1e7ef94fc1f..a271109f6a7 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java
@@ -227,17 +227,17 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
}
);
- coordinator.getReplicationStatus().forEach(
- (final String tier, final Object2LongMap status) -> {
- for (final Object2LongMap.Entry entry : status.object2LongEntrySet()) {
+ coordinator.computeUnderReplicationCountsPerDataSourcePerTier().forEach(
+ (final String tier, final Object2LongMap underReplicationCountsPerDataSource) -> {
+ for (final Object2LongMap.Entry entry : underReplicationCountsPerDataSource.object2LongEntrySet()) {
final String dataSource = entry.getKey();
- final long count = entry.getLongValue();
+ final long underReplicationCount = entry.getLongValue();
emitter.emit(
new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.TIER, tier)
.setDimension(DruidMetrics.DATASOURCE, dataSource).build(
- "segment/underReplicated/count", count
+ "segment/underReplicated/count", underReplicationCount
)
);
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
index 524797d9170..7d7830170c9 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
@@ -89,43 +89,16 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
// find available segments which are not overshadowed by other segments in DB
// only those would need to be loaded/dropped
// anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
- Map> timelines = new HashMap<>();
- for (DataSegment segment : params.getAvailableSegments()) {
- VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource());
- if (timeline == null) {
- timeline = new VersionedIntervalTimeline<>(Ordering.natural());
- timelines.put(segment.getDataSource(), timeline);
- }
-
- timeline.add(
- segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)
- );
- }
-
- Set overshadowed = new HashSet<>();
- for (VersionedIntervalTimeline timeline : timelines.values()) {
- for (TimelineObjectHolder holder : timeline.findOvershadowed()) {
- for (DataSegment dataSegment : holder.getObject().payloads()) {
- overshadowed.add(dataSegment);
- }
- }
- }
-
- Set nonOvershadowed = new HashSet<>();
- for (DataSegment dataSegment : params.getAvailableSegments()) {
- if (!overshadowed.contains(dataSegment)) {
- nonOvershadowed.add(dataSegment);
- }
- }
+ Set overshadowed = determineOvershadowedSegments(params);
for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier);
}
- DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExistingWithoutAvailableSegments()
- .withReplicationManager(replicatorThrottler)
- .withAvailableSegments(nonOvershadowed)
- .build();
+ DruidCoordinatorRuntimeParams paramsWithReplicationManager = params
+ .buildFromExistingWithoutAvailableSegments()
+ .withReplicationManager(replicatorThrottler)
+ .build();
// Run through all matched rules for available segments
DateTime now = DateTimes.nowUtc();
@@ -133,7 +106,11 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
final List segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0;
- for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
+ for (DataSegment segment : params.getAvailableSegments()) {
+ if (overshadowed.contains(segment)) {
+ // Skipping overshadowed segments
+ continue;
+ }
List rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
@@ -159,9 +136,26 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
.emit();
}
- return paramsWithReplicationManager.buildFromExistingWithoutAvailableSegments()
- .withCoordinatorStats(stats)
- .withAvailableSegments(params.getAvailableSegments())
- .build();
+ return params.buildFromExisting().withCoordinatorStats(stats).build();
+ }
+
+ private Set determineOvershadowedSegments(DruidCoordinatorRuntimeParams params)
+ {
+ Map> timelines = new HashMap<>();
+ for (DataSegment segment : params.getAvailableSegments()) {
+ timelines
+ .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
+ .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
+ }
+
+ Set overshadowed = new HashSet<>();
+ for (VersionedIntervalTimeline timeline : timelines.values()) {
+ for (TimelineObjectHolder holder : timeline.findOvershadowed()) {
+ for (DataSegment dataSegment : holder.getObject().payloads()) {
+ overshadowed.add(dataSegment);
+ }
+ }
+ }
+ return overshadowed;
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
index 788b0d5c2da..801bd3ba502 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
@@ -19,18 +19,19 @@
package org.apache.druid.server.coordinator.helper;
-import org.apache.druid.java.util.common.logger.Logger;
+import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.timeline.DataSegment;
-import java.util.Set;
+import java.util.TreeSet;
public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
{
- private final DruidCoordinator coordinator;
+ private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorSegmentInfoLoader.class);
- private static final Logger log = new Logger(DruidCoordinatorSegmentInfoLoader.class);
+ private final DruidCoordinator coordinator;
public DruidCoordinatorSegmentInfoLoader(DruidCoordinator coordinator)
{
@@ -42,8 +43,31 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
{
log.info("Starting coordination. Getting available segments.");
- // Display info about all available segments
- final Set availableSegments = coordinator.getOrderedAvailableDataSegments();
+ // The following transform() call doesn't actually transform the iterable. It only checks the sizes of the segments
+ // and emits alerts if segments with negative sizes are encountered. In other words, semantically it's similar to
+ // Stream.peek(). It works as long as DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() (which is called
+ // below) guarantees to go over the passed iterable exactly once.
+ //
+ // An iterable returned from iterateAvailableDataSegments() is not simply iterated (with size checks) before passing
+ // into DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() because iterateAvailableDataSegments()'s
+ // documentation says to strive to avoid iterating the result more than once.
+ //
+ //noinspection StaticPseudoFunctionalStyleMethod: https://youtrack.jetbrains.com/issue/IDEA-153047
+ Iterable availableSegmentsWithSizeChecking = Iterables.transform(
+ coordinator.iterateAvailableDataSegments(),
+ segment -> {
+ if (segment.getSize() < 0) {
+ log.makeAlert("No size on a segment")
+ .addData("segment", segment)
+ .emit();
+ }
+ return segment;
+ }
+ );
+ final TreeSet availableSegments =
+ DruidCoordinatorRuntimeParams.createAvailableSegmentsSet(availableSegmentsWithSizeChecking);
+
+ // Log info about all available segments
if (log.isDebugEnabled()) {
log.debug("Available DataSegments");
for (DataSegment dataSegment : availableSegments) {
@@ -54,7 +78,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
log.info("Found [%,d] available segments.", availableSegments.size());
return params.buildFromExisting()
- .withAvailableSegments(availableSegments)
+ .setAvailableSegments(availableSegments)
.build();
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
index f2c52a1eb86..48aff512922 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
@@ -43,7 +43,6 @@ import org.joda.time.Interval;
@JsonSubTypes.Type(name = IntervalBroadcastDistributionRule.TYPE, value = IntervalBroadcastDistributionRule.class),
@JsonSubTypes.Type(name = PeriodBroadcastDistributionRule.TYPE, value = PeriodBroadcastDistributionRule.class)
})
-
public interface Rule
{
String getType();
@@ -52,5 +51,16 @@ public interface Rule
boolean appliesTo(Interval interval, DateTime referenceTimestamp);
+ /**
+ * {@link DruidCoordinatorRuntimeParams#getAvailableSegments()} must not be called in Rule's code, because the
+ * available segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is
+ * because {@link DruidCoordinatorRuntimeParams} entangles two slightly different (nonexistent yet) abstractions:
+ * "DruidCoordinatorHelperParams" and "RuleParams" which contain params that only {@link
+ * org.apache.druid.server.coordinator.helper.DruidCoordinatorHelper}s and Rules need, respectively.
+ * For example, {@link org.apache.druid.server.coordinator.ReplicationThrottler} needs to belong only to "RuleParams",
+ * but not "DruidCoordinatorHelperParams". The opposite for "AvailableSegments".
+ *
+ * See https://github.com/apache/incubator-druid/issues/7228
+ */
CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment);
}
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java
index 6578d74786b..779e909c4f6 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java
@@ -95,7 +95,7 @@ public class CoordinatorResource
}
if (full != null) {
- return Response.ok(coordinator.getReplicationStatus()).build();
+ return Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build();
}
return Response.ok(coordinator.getLoadStatus()).build();
}
diff --git a/server/src/main/java/org/apache/druid/server/http/ServersResource.java b/server/src/main/java/org/apache/druid/server/http/ServersResource.java
index 437eb5ae905..9b31a67aa13 100644
--- a/server/src/main/java/org/apache/druid/server/http/ServersResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/ServersResource.java
@@ -80,8 +80,8 @@ public class ServersResource
* *and* values, but is done so for compatibility with the existing HTTP JSON API.
*
* Returns a lazy map suitable for serialization (i. e. entrySet iteration) only, relying on the fact that the
- * segments returned from {@link DruidServer#getSegments()} are unique. This is not a part of the {@link DruidServer}
- * API to not let abuse this map (like trying to get() from it).
+ * segments returned from {@link DruidServer#iterateAllSegments()} are unique. This is not a part of the {@link
+ * DruidServer} API to not let abuse this map (like trying to get() from it).
*/
private static Map createLazySegmentsMap(DruidServer server)
{
@@ -96,7 +96,7 @@ public class ServersResource
public Iterator> iterator()
{
return Iterators.transform(
- server.getSegments().iterator(),
+ server.iterateAllSegments().iterator(),
segment -> new AbstractMap.SimpleImmutableEntry<>(segment.getId(), segment)
);
}
@@ -173,11 +173,11 @@ public class ServersResource
}
if (full != null) {
- return builder.entity(Iterables.toString(server.getSegments())).build();
+ return builder.entity(Iterables.toString(server.iterateAllSegments())).build();
}
return builder
- .entity(Iterables.toString(Iterables.transform(server.getSegments(), DataSegment::getId)))
+ .entity(Iterables.toString(Iterables.transform(server.iterateAllSegments(), DataSegment::getId)))
.build();
}
diff --git a/server/src/main/java/org/apache/druid/server/http/TiersResource.java b/server/src/main/java/org/apache/druid/server/http/TiersResource.java
index 4debc83d17e..51a0fdad1d8 100644
--- a/server/src/main/java/org/apache/druid/server/http/TiersResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/TiersResource.java
@@ -104,7 +104,7 @@ public class TiersResource
Map>> tierToStatsPerInterval = new HashMap<>();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
if (druidServer.getTier().equalsIgnoreCase(tierName)) {
- for (DataSegment dataSegment : druidServer.getSegments()) {
+ for (DataSegment dataSegment : druidServer.iterateAllSegments()) {
Map properties = tierToStatsPerInterval
.computeIfAbsent(dataSegment.getDataSource(), dsName -> new HashMap<>())
.computeIfAbsent(dataSegment.getInterval(), interval -> new EnumMap<>(IntervalProperties.class));
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index d4f48905119..5e07b7fb71b 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -259,7 +259,7 @@ public class HttpServerInventoryViewTest
DruidServer druidServer = httpServerInventoryView.getInventoryValue("host:8080");
Assert.assertEquals(
ImmutableMap.of(segment3.getId(), segment3, segment4.getId(), segment4),
- Maps.uniqueIndex(druidServer.getSegments(), DataSegment::getId)
+ Maps.uniqueIndex(druidServer.iterateAllSegments(), DataSegment::getId)
);
druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode));
diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
index 1586b60b47f..d2de54fa827 100644
--- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
@@ -237,7 +237,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
- Set segments = Sets.newHashSet(server.getSegments());
+ Set segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
@@ -251,7 +251,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
- Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+ Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
segmentAnnouncer.unannounceSegment(segment1);
segmentAnnouncer.unannounceSegment(segment2);
@@ -260,7 +260,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
- Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+ Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
}
@Test
@@ -271,7 +271,7 @@ public class BatchServerInventoryViewTest
waitForSync(filteredBatchServerInventoryView, testSegments);
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
- Set segments = Sets.newHashSet(server.getSegments());
+ Set segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
int prevUpdateCount = inventoryUpdateCounter.get();
@@ -297,7 +297,7 @@ public class BatchServerInventoryViewTest
waitForSync(filteredBatchServerInventoryView, testSegments);
DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
- Set segments = Sets.newHashSet(server.getSegments());
+ Set segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
@@ -393,7 +393,7 @@ public class BatchServerInventoryViewTest
final Timing forWaitingTiming = timing.forWaiting();
Stopwatch stopwatch = Stopwatch.createStarted();
while (Iterables.isEmpty(batchServerInventoryView.getInventory())
- || Iterables.size(Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments()) !=
+ || Iterables.size(Iterables.get(batchServerInventoryView.getInventory(), 0).iterateAllSegments()) !=
testSegments.size()) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
@@ -430,7 +430,7 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
- final Set segments = Sets.newHashSet(server.getSegments());
+ final Set segments = Sets.newHashSet(server.iterateAllSegments());
Assert.assertEquals(testSegments, segments);
@@ -498,7 +498,7 @@ public class BatchServerInventoryViewTest
Assert.assertEquals(INITIAL_SEGMENTS * 2, testSegments.size());
waitForSync(batchServerInventoryView, testSegments);
- Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+ Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
for (int i = 0; i < INITIAL_SEGMENTS; ++i) {
final DataSegment segment = makeSegment(100 + i);
@@ -508,6 +508,6 @@ public class BatchServerInventoryViewTest
waitForSync(batchServerInventoryView, testSegments);
- Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+ Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
index 8a5cf43efe6..e6aa17ebd1c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
@@ -94,7 +94,7 @@ public class CostBalancerStrategyTest
segments.put(segment.getId(), segment);
EasyMock.expect(druidServer.getSegment(segment.getId())).andReturn(segment).anyTimes();
}
- EasyMock.expect(druidServer.getSegments()).andReturn(segments.values()).anyTimes();
+ EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments.values()).anyTimes();
EasyMock.replay(druidServer);
serverHolderList.add(new ServerHolder(druidServer, fromPeon));
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 94fecdb4d78..25dce8cf577 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
@@ -35,6 +36,7 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -324,10 +326,10 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
// for the destination and unannouncing from source server when noticing a drop request
sourceLoadQueueChildrenCache.getListenable().addListener(
- (curatorFramework, pathChildrenCacheEvent) -> {
- if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
+ (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
+ if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
srcCountdown.countDown();
- } else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+ } else if (CuratorUtils.isChildAdded(event)) {
//Simulate source server dropping segment
unannounceSegmentFromBatchForServer(source, segmentToMove, sourceSegKeys.get(2), zkPathsConfig);
}
@@ -335,10 +337,10 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
);
destinationLoadQueueChildrenCache.getListenable().addListener(
- (curatorFramework, pathChildrenCacheEvent) -> {
- if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
+ (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
+ if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
destCountdown.countDown();
- } else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+ } else if (CuratorUtils.isChildAdded(event)) {
//Simulate destination server loading segment
announceBatchSegmentsForServer(dest, ImmutableSet.of(segmentToMove), zkPathsConfig, jsonMapper);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java
index fc2a17ce9d6..0bbe46d9b85 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java
@@ -88,7 +88,7 @@ public class DiskNormalizedCostBalancerStrategyTest
segments.add(segment);
EasyMock.expect(druidServer.getSegment(segment.getId())).andReturn(segment).anyTimes();
}
- EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
+ EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments).anyTimes();
EasyMock.replay(druidServer);
serverHolderList.add(new ServerHolder(druidServer, fromPeon));
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
index 719a2b4553f..ed4f600c2cd 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
@@ -116,9 +116,9 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
if (i == 0) {
- EasyMock.expect(server.getSegments()).andReturn(segments).anyTimes();
+ EasyMock.expect(server.getLazyAllSegments()).andReturn(segments).anyTimes();
} else {
- EasyMock.expect(server.getSegments()).andReturn(Collections.emptyList()).anyTimes();
+ EasyMock.expect(server.getLazyAllSegments()).andReturn(Collections.emptyList()).anyTimes();
}
EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(server);
@@ -148,7 +148,7 @@ public class DruidCoordinatorBalancerProfiler
.withLoadManagementPeons(
peonMap
)
- .withAvailableSegments(segments)
+ .withAvailableSegmentsInTest(segments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
@@ -197,7 +197,7 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
- EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
+ EasyMock.expect(druidServer1.getLazyAllSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
@@ -205,7 +205,7 @@ public class DruidCoordinatorBalancerProfiler
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
- EasyMock.expect(druidServer2.getSegments()).andReturn(Collections.emptyList()).anyTimes();
+ EasyMock.expect(druidServer2.getLazyAllSegments()).andReturn(Collections.emptyList()).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
@@ -246,7 +246,7 @@ public class DruidCoordinatorBalancerProfiler
toPeon
)
)
- .withAvailableSegments(segments)
+ .withAvailableSegmentsInTest(segments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java
index dbd3048e539..0a3089808c6 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java
@@ -535,7 +535,7 @@ public class DruidCoordinatorBalancerTest
.boxed()
.collect(Collectors.toMap(i -> String.valueOf(i + 1), peons::get))
)
- .withAvailableSegments(segments)
+ .withAvailableSegmentsInTest(segments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
MAX_SEGMENTS_TO_MOVE
@@ -558,7 +558,7 @@ public class DruidCoordinatorBalancerTest
EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
- EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
+ EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments).anyTimes();
EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes();
EasyMock.expect(druidServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes();
if (!segments.isEmpty()) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
index 80dfd82652a..51a79307c75 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
@@ -198,7 +198,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@@ -304,7 +304,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@@ -403,7 +403,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@@ -478,7 +478,7 @@ public class DruidCoordinatorRuleRunnerTest
new DruidCoordinatorRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@@ -540,7 +540,7 @@ public class DruidCoordinatorRuleRunnerTest
new DruidCoordinatorRuntimeParams.Builder()
.withEmitter(emitter)
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.build();
@@ -609,7 +609,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@@ -695,7 +695,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@@ -786,7 +786,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@@ -865,7 +865,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@@ -963,7 +963,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@@ -1047,7 +1047,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
@@ -1077,7 +1077,7 @@ public class DruidCoordinatorRuleRunnerTest
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withEmitter(emitter)
- .withAvailableSegments(Collections.singletonList(overFlowSegment))
+ .withAvailableSegmentsInTest(Collections.singletonList(overFlowSegment))
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
@@ -1180,7 +1180,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
@@ -1286,7 +1286,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
- .withAvailableSegments(longerAvailableSegments)
+ .withAvailableSegmentsInTest(longerAvailableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withBalancerStrategy(balancerStrategy)
@@ -1369,7 +1369,7 @@ public class DruidCoordinatorRuleRunnerTest
DruidCoordinatorRuntimeParams params =
new DruidCoordinatorRuntimeParams.Builder()
.withDruidCluster(druidCluster)
- .withAvailableSegments(availableSegments)
+ .withAvailableSegmentsInTest(availableSegments)
.withDatabaseRuleManager(databaseRuleManager)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withBalancerStrategy(balancerStrategy)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index f24f0662948..1f4f7e2844b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.SingleServerInventoryView;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -59,18 +60,15 @@ import org.apache.druid.timeline.SegmentId;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.Duration;
-import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
+import javax.annotation.Nullable;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -82,13 +80,15 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class DruidCoordinatorTest extends CuratorTestBase
{
+ private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
+ private static final long COORDINATOR_START_DELAY = 1;
+ private static final long COORDINATOR_PERIOD = 100;
+
private DruidCoordinator coordinator;
private MetadataSegmentManager databaseSegmentManager;
private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory;
private DruidServer druidServer;
- private DruidServer druidServer2;
- private DataSegment segment;
private ConcurrentMap loadManagementPeons;
private LoadQueuePeon loadQueuePeon;
private MetadataRuleManager metadataRuleManager;
@@ -97,12 +97,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
private PathChildrenCache pathChildrenCache;
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
- private JacksonConfigManager configManager;
private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
- private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
- private static final long COORDINATOR_START_DELAY = 1;
- private static final long COORDINATOR_PERIOD = 100;
@Before
public void setUp() throws Exception
@@ -111,7 +107,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
- configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
+ JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@@ -228,7 +224,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void testMoveSegment()
{
- segment = EasyMock.createNiceMock(DataSegment.class);
+ final DataSegment segment = EasyMock.createNiceMock(DataSegment.class);
EasyMock.expect(segment.getId()).andReturn(SegmentId.dummy("dummySegment"));
EasyMock.expect(segment.getDataSource()).andReturn("dummyDataSource");
EasyMock.replay(segment);
@@ -269,7 +265,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
).atLeastOnce();
EasyMock.replay(druidServer);
- druidServer2 = EasyMock.createMock(DruidServer.class);
+ DruidServer druidServer2 = EasyMock.createMock(DruidServer.class);
EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
@@ -324,7 +320,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.replay(metadataRuleManager);
// Setup MetadataSegmentManager
- DruidDataSource[] druidDataSources = {
+ DruidDataSource[] dataSources = {
new DruidDataSource(dataSource, Collections.emptyMap())
};
final DataSegment dataSegment = new DataSegment(
@@ -338,13 +334,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
0x9,
0
);
- druidDataSources[0].addSegment(dataSegment);
+ dataSources[0].addSegment(dataSegment);
- EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
- EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
- ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
- ).atLeastOnce();
- EasyMock.replay(databaseSegmentManager);
+ setupMetadataSegmentManagerMock(dataSources[0]);
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(immutableDruidDataSource.getSegments())
.andReturn(ImmutableSet.of(dataSegment)).atLeastOnce();
@@ -372,9 +364,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
new PathChildrenCacheListener()
{
@Override
- public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
+ public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
{
- if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+ if (CuratorUtils.isChildAdded(event)) {
if (assignSegmentLatch.getCount() > 0) {
//Coordinator should try to assign segment to druidServer historical
//Simulate historical loading segment
@@ -402,17 +394,19 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertEquals(1, segmentAvailability.size());
Assert.assertEquals(0L, segmentAvailability.get(dataSource));
- Map> replicationStatus = coordinator.getReplicationStatus();
- Assert.assertNotNull(replicationStatus);
- Assert.assertEquals(1, replicationStatus.entrySet().size());
+ Map> underReplicationCountsPerDataSourcePerTier =
+ coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
+ Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
+ Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
- Object2LongMap dataSourceMap = replicationStatus.get(tier);
- Assert.assertNotNull(dataSourceMap);
- Assert.assertEquals(1, dataSourceMap.size());
- Assert.assertNotNull(dataSourceMap.get(dataSource));
+ Object2LongMap underRepliicationCountsPerDataSource = underReplicationCountsPerDataSourcePerTier.get(tier);
+ Assert.assertNotNull(underRepliicationCountsPerDataSource);
+ Assert.assertEquals(1, underRepliicationCountsPerDataSource.size());
+ //noinspection deprecation
+ Assert.assertNotNull(underRepliicationCountsPerDataSource.get(dataSource));
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, therefore 1 replica should still be pending
- Assert.assertEquals(1L, dataSourceMap.getLong(dataSource));
+ Assert.assertEquals(1L, underRepliicationCountsPerDataSource.getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
@@ -467,18 +461,17 @@ public class DruidCoordinatorTest extends CuratorTestBase
DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);
+ setupMetadataSegmentManagerMock(druidDataSources[0]);
+
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
- EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
- EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
- ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
- ).atLeastOnce();
+
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
- EasyMock.replay(metadataRuleManager, databaseSegmentManager, serverInventoryView);
+ EasyMock.replay(metadataRuleManager, serverInventoryView);
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
@@ -486,15 +479,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2);
pathChildrenCache.getListenable().addListener(
(client, event) -> {
- if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
- DataSegment segment = dataSegments
- .entrySet()
- .stream()
- .filter(x -> event.getData().getPath().contains(x.getKey()))
- .map(Map.Entry::getValue)
- .findFirst()
- .orElse(null);
-
+ if (CuratorUtils.isChildAdded(event)) {
+ DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
if (segment != null) {
hotServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
@@ -507,15 +493,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1);
pathChildrenCacheCold.getListenable().addListener(
- (client, event) -> {
- if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
- DataSegment segment = dataSegments
- .entrySet()
- .stream()
- .filter(x -> event.getData().getPath().contains(x.getKey()))
- .map(Map.Entry::getValue)
- .findFirst()
- .orElse(null);
+ (CuratorFramework client, PathChildrenCacheEvent event) -> {
+ if (CuratorUtils.isChildAdded(event)) {
+ DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
if (segment != null) {
coldServer.addDataSegment(segment);
@@ -536,10 +516,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
- Map> replicationStatus = coordinator.getReplicationStatus();
- Assert.assertEquals(2, replicationStatus.entrySet().size());
- Assert.assertEquals(0L, replicationStatus.get(hotTierName).getLong(dataSource));
- Assert.assertEquals(0L, replicationStatus.get(coldTierName).getLong(dataSource));
+ Map> underReplicationCountsPerDataSourcePerTier =
+ coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
+ Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
+ Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
+ Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
coordinator.stop();
leaderUnannouncerLatch.await();
@@ -549,52 +530,37 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.verify(metadataRuleManager);
}
- @Test
- public void testOrderedAvailableDataSegments()
+ private void setupMetadataSegmentManagerMock(DruidDataSource dataSource)
{
- DruidDataSource dataSource = new DruidDataSource("test", new HashMap());
- DataSegment[] segments = new DataSegment[]{
- getSegment("test", Intervals.of("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
- getSegment("test", Intervals.of("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
- getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")),
- getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z"))
- };
- for (DataSegment segment : segments) {
- dataSource.addSegment(segment);
- }
-
- EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
- ImmutableList.of(dataSource.toImmutableDruidDataSource())
- ).atLeastOnce();
+ EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
+ EasyMock
+ .expect(databaseSegmentManager.iterateAllSegments())
+ .andReturn(dataSource.getSegments())
+ .anyTimes();
+ EasyMock
+ .expect(databaseSegmentManager.getDataSources())
+ .andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
+ .anyTimes();
+ EasyMock
+ .expect(databaseSegmentManager.getAllDataSourceNames())
+ .andReturn(Collections.singleton(dataSource.getName()))
+ .anyTimes();
EasyMock.replay(databaseSegmentManager);
- Set availableSegments = coordinator.getOrderedAvailableDataSegments();
- DataSegment[] expected = new DataSegment[]{
- getSegment("test", Intervals.of("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
- getSegment("test", Intervals.of("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
- getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")),
- getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z"))
- };
- Assert.assertEquals(expected.length, availableSegments.size());
- Assert.assertEquals(expected, availableSegments.toArray());
- EasyMock.verify(databaseSegmentManager);
}
-
- private DataSegment getSegment(String dataSource, Interval interval)
+ @Nullable
+ private static DataSegment findSegmentRelatedToCuratorEvent(
+ Map dataSegments,
+ PathChildrenCacheEvent event
+ )
{
- // Not using EasyMock as it hampers the performance of multithreads.
- DataSegment segment = new DataSegment(
- dataSource,
- interval,
- "dummy_version",
- new ConcurrentHashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- null,
- 0,
- 0L
- );
- return segment;
+ return dataSegments
+ .entrySet()
+ .stream()
+ .filter(x -> event.getData().getPath().contains(x.getKey()))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElse(null);
}
private static class TestDruidLeaderSelector implements DruidLeaderSelector
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
index 2ef3cdd44f1..001dc2af24c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
@@ -138,7 +138,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
- EasyMock.expect(druidServer1.getSegments()).andReturn(segments1).anyTimes();
+ EasyMock.expect(druidServer1.getLazyAllSegments()).andReturn(segments1).anyTimes();
EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
@@ -146,7 +146,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
- EasyMock.expect(druidServer2.getSegments()).andReturn(segments2).anyTimes();
+ EasyMock.expect(druidServer2.getLazyAllSegments()).andReturn(segments2).anyTimes();
EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
@@ -154,7 +154,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
- EasyMock.expect(druidServer3.getSegments()).andReturn(segments3).anyTimes();
+ EasyMock.expect(druidServer3.getLazyAllSegments()).andReturn(segments3).anyTimes();
EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
@@ -162,7 +162,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
- EasyMock.expect(druidServer4.getSegments()).andReturn(segments4).anyTimes();
+ EasyMock.expect(druidServer4.getLazyAllSegments()).andReturn(segments4).anyTimes();
EasyMock.expect(druidServer4.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer4);
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java
index fe7b321777e..b8699fb0866 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java
@@ -115,7 +115,7 @@ public class CachingCostBalancerStrategyTest
{
ClusterCostCache.Builder builder = ClusterCostCache.builder();
serverHolders.forEach(
- s -> s.getServer().getSegments().forEach(segment -> builder.addSegment(s.getServer().getName(), segment))
+ s -> s.getServer().getLazyAllSegments().forEach(segment -> builder.addSegment(s.getServer().getName(), segment))
);
return new CachingCostBalancerStrategy(builder.build(), listeningExecutorService);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java
index 996f3f9791b..0ae0ee6c07d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java
@@ -117,11 +117,12 @@ public class DruidCoordinatorCleanupOvershadowedTest
).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
));
- DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder()
- .withAvailableSegments(availableSegments)
- .withCoordinatorStats(new CoordinatorStats())
- .withDruidCluster(druidCluster)
- .build();
+ DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
+ .newBuilder()
+ .withAvailableSegmentsInTest(availableSegments)
+ .withCoordinatorStats(new CoordinatorStats())
+ .withDruidCluster(druidCluster)
+ .build();
druidCoordinatorCleanupOvershadowed.run(params);
EasyMock.verify(coordinator, druidDataSource, druidServer);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index 83398d7f6dc..adb34479272 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -286,7 +286,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(
+ .withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
@@ -337,7 +337,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(secondCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(secondCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(
+ .withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1)
@@ -366,7 +366,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(
+ .withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
@@ -404,7 +404,7 @@ public class BroadcastDistributionRuleTest
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(
+ .withAvailableSegmentsInTest(
smallSegment,
largeSegments.get(0),
largeSegments.get(1),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index a8793b2a647..73d47ccc38b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -182,7 +182,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -253,7 +253,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -303,7 +303,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -393,7 +393,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -482,7 +482,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -541,7 +541,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -614,7 +614,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -671,7 +671,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(dataSegment1, dataSegment2, dataSegment3)
+ .withAvailableSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsInNodeLoadingQueue(2).build())
.build();
@@ -728,7 +728,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -785,7 +785,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment).build(),
+ .withAvailableSegmentsInTest(segment).build(),
segment
);
@@ -838,7 +838,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment1, segment2)
+ .withAvailableSegmentsInTest(segment1, segment2)
.build();
CoordinatorStats stats = rule.run(
null,
@@ -904,7 +904,7 @@ public class LoadRuleTest
.withReplicationManager(throttler)
.withBalancerStrategy(mockBalancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
- .withAvailableSegments(segment1)
+ .withAvailableSegmentsInTest(segment1)
.build();
CoordinatorStats stats = rule.run(
null,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index e895113f850..6316b40f991 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -479,7 +479,7 @@ public class SystemSchema extends AbstractSchema
final List druidServers = serverView.getDruidServers();
final int serverSegmentsTableSize = SERVER_SEGMENTS_SIGNATURE.getRowOrder().size();
for (ImmutableDruidServer druidServer : druidServers) {
- for (DataSegment segment : druidServer.getSegments()) {
+ for (DataSegment segment : druidServer.getLazyAllSegments()) {
Object[] row = new Object[serverSegmentsTableSize];
row[0] = druidServer.getHost();
row[1] = segment.getId();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index e707c40aeb5..ba44081fc71 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -268,19 +268,15 @@ public class DruidSchemaTest extends CalciteTestBase
SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build();
schema.setSegmentMetadataHolder(existingSegment, updatedHolder);
// find a druidServer holding existingSegment
- final Pair pair = druidServers.stream()
- .flatMap(druidServer -> druidServer.getSegments()
- .stream()
- .filter(segment -> segment
- .equals(
- existingSegment))
- .map(segment -> Pair
- .of(
- druidServer,
- segment
- )))
- .findAny()
- .orElse(null);
+ final Pair pair = druidServers
+ .stream()
+ .flatMap(druidServer -> druidServer
+ .getLazyAllSegments().stream()
+ .filter(segment -> segment.equals(existingSegment))
+ .map(segment -> Pair.of(druidServer, segment))
+ )
+ .findAny()
+ .orElse(null);
Assert.assertNotNull(pair);
final ImmutableDruidServer server = pair.lhs;
Assert.assertNotNull(server);