diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index abacb196b10..3256dc370b7 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -272,7 +272,7 @@ public class CachingClusteredClient implements QueryRunner Hasher hasher = Hashing.sha1().newHasher(); boolean hasOnlyHistoricalSegments = true; for (Pair p : segments) { - if (!p.lhs.pick().getServer().isAssignable()) { + if (!p.lhs.pick().getServer().segmentReplicatable()) { hasOnlyHistoricalSegments = false; break; } @@ -429,7 +429,7 @@ public class CachingClusteredClient implements QueryRunner final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors); final Sequence resultSeqToAdd; - if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable + if (!server.segmentReplicatable() || !populateCache || isBySegment) { // Direct server queryable if (!isBySegment) { resultSeqToAdd = clientQueryable.run(queryPlus.withQuerySegmentSpec(segmentSpec), responseContext); } else { diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index 0c903186ece..284a71a84d7 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -28,6 +28,7 @@ import com.google.common.collect.Maps; import io.druid.java.util.common.logger.Logger; import io.druid.server.DruidNode; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import java.util.Collections; @@ -111,7 +112,7 @@ public class DruidServer implements Comparable return metadata.getMaxSize(); } - public String getType() + public ServerType getType() { return metadata.getType(); } @@ -121,9 +122,9 @@ public class DruidServer implements Comparable return metadata.getTier(); } - public boolean isAssignable() + public boolean segmentReplicatable() { - return metadata.isAssignable(); + return metadata.segmentReplicatable(); } public int getPriority() diff --git a/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java index 1c7435e0e75..c83d674e428 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidDataSource.java @@ -72,4 +72,15 @@ public class ImmutableDruidDataSource { return segmentsHolder; } + + @Override + public String toString() + { + // partitionNames is intentionally ignored because it is usually large + return "ImmutableDruidDataSource{" + + "name='" + name + + "', segments='" + segmentsHolder + + "', properties='" + properties + + "'}"; + } } diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java b/server/src/main/java/io/druid/client/ImmutableDruidServer.java index 534f91cad44..1851befe277 100644 --- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java +++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java @@ -21,6 +21,7 @@ package io.druid.client; import com.google.common.collect.ImmutableMap; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; import java.util.Map; @@ -72,7 +73,7 @@ public class ImmutableDruidServer return metadata.getMaxSize(); } - public String getType() + public ServerType getType() { return metadata.getType(); } @@ -106,4 +107,15 @@ public class ImmutableDruidServer { return segments; } + + @Override + public String toString() + { + // segments is intentionally ignored because it is usually large + return "ImmutableDruidServer{" + + "meta='" + metadata + + "', size='" + currSize + + "', sources='" + dataSources + + "'}"; + } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 9e5ea1761c9..15ac0971269 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -84,6 +85,7 @@ public class RealtimeManager implements QuerySegmentWalker this(fireDepartments, conglomerate, serverAnnouncer, Maps.newHashMap()); } + @VisibleForTesting RealtimeManager( List fireDepartments, QueryRunnerFactoryConglomerate conglomerate, @@ -94,7 +96,7 @@ public class RealtimeManager implements QuerySegmentWalker this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; this.serverAnnouncer = serverAnnouncer; - this.chiefs = chiefs; + this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs); } @LifecycleStart diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java index f77c3d92ffa..b2bdead3ef2 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java @@ -148,7 +148,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot @Override public boolean apply(DruidServerMetadata input) { - return input.isAssignable(); + return input.segmentReplicatable(); } } )) { diff --git a/server/src/main/java/io/druid/server/SegmentManager.java b/server/src/main/java/io/druid/server/SegmentManager.java new file mode 100644 index 00000000000..9013ed66307 --- /dev/null +++ b/server/src/main/java/io/druid/server/SegmentManager.java @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.metamx.emitter.EmittingLogger; +import io.druid.collections.CountingMap; +import io.druid.segment.ReferenceCountingSegment; +import io.druid.segment.Segment; +import io.druid.segment.loading.SegmentLoader; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; +import io.druid.timeline.VersionedIntervalTimeline; +import io.druid.timeline.partition.PartitionChunk; +import io.druid.timeline.partition.PartitionHolder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class SegmentManager +{ + private static final EmittingLogger log = new EmittingLogger(SegmentManager.class); + + private final Object lock = new Object(); + private final SegmentLoader segmentLoader; + private final Map> dataSources = new HashMap<>(); + private final CountingMap dataSourceSizes = new CountingMap<>(); + private final CountingMap dataSourceCounts = new CountingMap<>(); + + @Inject + public SegmentManager( + SegmentLoader segmentLoader + ) + { + this.segmentLoader = segmentLoader; + } + + public Map getDataSourceSizes() + { + synchronized (dataSourceSizes) { + return dataSourceSizes.snapshot(); + } + } + + public Map getDataSourceCounts() + { + synchronized (dataSourceCounts) { + return dataSourceCounts.snapshot(); + } + } + + public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException + { + return segmentLoader.isSegmentLoaded(segment); + } + + public VersionedIntervalTimeline getTimeline(String dataSource) + { + synchronized (lock) { + return dataSources.get(dataSource); + } + } + + /** + * Load a single segment. + * + * @param segment segment to load + * + * @return true if the segment was newly loaded, false if it was already loaded + * + * @throws SegmentLoadingException if the segment cannot be loaded + */ + public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException + { + final Segment adapter = getAdapter(segment); + + synchronized (lock) { + final String dataSource = segment.getDataSource(); + final VersionedIntervalTimeline loadedIntervals = dataSources.computeIfAbsent( + dataSource, + k -> new VersionedIntervalTimeline<>(Ordering.natural()) + ); + + final PartitionHolder entry = loadedIntervals.findEntry( + segment.getInterval(), + segment.getVersion() + ); + if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { + log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); + return false; + } + + loadedIntervals.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)) + ); + synchronized (dataSourceSizes) { + dataSourceSizes.add(dataSource, segment.getSize()); + } + synchronized (dataSourceCounts) { + dataSourceCounts.add(dataSource, 1L); + } + return true; + } + } + + private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException + { + final Segment adapter; + try { + adapter = segmentLoader.getSegment(segment); + } + catch (SegmentLoadingException e) { + try { + segmentLoader.cleanup(segment); + } + catch (SegmentLoadingException e1) { + e.addSuppressed(e1); + } + throw e; + } + + if (adapter == null) { + throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); + } + return adapter; + } + + public void dropSegment(final DataSegment segment) throws SegmentLoadingException + { + String dataSource = segment.getDataSource(); + synchronized (lock) { + VersionedIntervalTimeline loadedIntervals = dataSources.get(dataSource); + + if (loadedIntervals == null) { + log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource); + return; + } + + PartitionChunk removed = loadedIntervals.remove( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(null) + ); + ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject(); + + if (oldQueryable != null) { + synchronized (dataSourceSizes) { + dataSourceSizes.add(dataSource, -segment.getSize()); + } + synchronized (dataSourceCounts) { + dataSourceCounts.add(dataSource, -1L); + } + + try { + log.info("Attempting to close segment %s", segment.getIdentifier()); + oldQueryable.close(); + } + catch (IOException e) { + log.makeAlert(e, "Exception closing segment") + .addData("dataSource", dataSource) + .addData("segmentId", segment.getIdentifier()) + .emit(); + } + } else { + log.info( + "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", + dataSource, + segment.getInterval(), + segment.getVersion() + ); + } + } + segmentLoader.cleanup(segment); + } +} diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index bcbe9923798..b73322f726e 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -310,7 +310,7 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer return makeServedSegmentPath( UUIDUtils.generateUuid( server.getHost(), - server.getType(), + server.getType().toString(), server.getTier(), new DateTime().toString() ) diff --git a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java index 9094d100998..c209ecc7052 100644 --- a/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java +++ b/server/src/main/java/io/druid/server/coordination/DruidServerMetadata.java @@ -30,7 +30,7 @@ public class DruidServerMetadata private final String host; private final long maxSize; private final String tier; - private final String type; + private final ServerType type; private final int priority; @JsonCreator @@ -47,7 +47,7 @@ public class DruidServerMetadata this.host = host; this.maxSize = maxSize; this.tier = tier; - this.type = type; + this.type = ServerType.fromString(type); this.priority = priority; } @@ -76,7 +76,7 @@ public class DruidServerMetadata } @JsonProperty - public String getType() + public ServerType getType() { return type; } @@ -87,9 +87,9 @@ public class DruidServerMetadata return priority; } - public boolean isAssignable() + public boolean segmentReplicatable() { - return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge"); + return type.isSegmentReplicationTarget(); } @Override diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 06406c284c8..9b906325cba 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -22,14 +22,12 @@ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.CachingQueryRunner; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; -import io.druid.collections.CountingMap; import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Smile; @@ -55,10 +53,7 @@ import io.druid.query.TableDataSource; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; -import io.druid.segment.Segment; -import io.druid.segment.loading.SegmentLoader; -import io.druid.segment.loading.SegmentLoadingException; -import io.druid.timeline.DataSegment; +import io.druid.server.SegmentManager; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -66,11 +61,8 @@ import io.druid.timeline.partition.PartitionHolder; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; @@ -79,32 +71,27 @@ import java.util.concurrent.atomic.AtomicLong; public class ServerManager implements QuerySegmentWalker { private static final EmittingLogger log = new EmittingLogger(ServerManager.class); - private final Object lock = new Object(); - private final SegmentLoader segmentLoader; private final QueryRunnerFactoryConglomerate conglomerate; private final ServiceEmitter emitter; private final ExecutorService exec; private final ExecutorService cachingExec; - private final Map> dataSources; - private final CountingMap dataSourceSizes = new CountingMap(); - private final CountingMap dataSourceCounts = new CountingMap(); private final Cache cache; private final ObjectMapper objectMapper; private final CacheConfig cacheConfig; + private final SegmentManager segmentManager; @Inject public ServerManager( - SegmentLoader segmentLoader, QueryRunnerFactoryConglomerate conglomerate, ServiceEmitter emitter, @Processing ExecutorService exec, @BackgroundCaching ExecutorService cachingExec, @Smile ObjectMapper objectMapper, Cache cache, - CacheConfig cacheConfig + CacheConfig cacheConfig, + SegmentManager segmentManager ) { - this.segmentLoader = segmentLoader; this.conglomerate = conglomerate; this.emitter = emitter; @@ -113,137 +100,8 @@ public class ServerManager implements QuerySegmentWalker this.cache = cache; this.objectMapper = objectMapper; - this.dataSources = new HashMap<>(); this.cacheConfig = cacheConfig; - } - - public Map getDataSourceSizes() - { - synchronized (dataSourceSizes) { - return dataSourceSizes.snapshot(); - } - } - - public Map getDataSourceCounts() - { - synchronized (dataSourceCounts) { - return dataSourceCounts.snapshot(); - } - } - - public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException - { - return segmentLoader.isSegmentLoaded(segment); - } - - /** - * Load a single segment. - * - * @param segment segment to load - * - * @return true if the segment was newly loaded, false if it was already loaded - * - * @throws SegmentLoadingException if the segment cannot be loaded - */ - public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException - { - final Segment adapter; - try { - adapter = segmentLoader.getSegment(segment); - } - catch (SegmentLoadingException e) { - try { - segmentLoader.cleanup(segment); - } - catch (SegmentLoadingException e1) { - // ignore - } - throw e; - } - - if (adapter == null) { - throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); - } - - synchronized (lock) { - String dataSource = segment.getDataSource(); - VersionedIntervalTimeline loadedIntervals = dataSources.get(dataSource); - - if (loadedIntervals == null) { - loadedIntervals = new VersionedIntervalTimeline<>(Ordering.natural()); - dataSources.put(dataSource, loadedIntervals); - } - - PartitionHolder entry = loadedIntervals.findEntry( - segment.getInterval(), - segment.getVersion() - ); - if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) { - log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier()); - return false; - } - - loadedIntervals.add( - segment.getInterval(), - segment.getVersion(), - segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)) - ); - synchronized (dataSourceSizes) { - dataSourceSizes.add(dataSource, segment.getSize()); - } - synchronized (dataSourceCounts) { - dataSourceCounts.add(dataSource, 1L); - } - return true; - } - } - - public void dropSegment(final DataSegment segment) throws SegmentLoadingException - { - String dataSource = segment.getDataSource(); - synchronized (lock) { - VersionedIntervalTimeline loadedIntervals = dataSources.get(dataSource); - - if (loadedIntervals == null) { - log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource); - return; - } - - PartitionChunk removed = loadedIntervals.remove( - segment.getInterval(), - segment.getVersion(), - segment.getShardSpec().createChunk((ReferenceCountingSegment) null) - ); - ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject(); - - if (oldQueryable != null) { - synchronized (dataSourceSizes) { - dataSourceSizes.add(dataSource, -segment.getSize()); - } - synchronized (dataSourceCounts) { - dataSourceCounts.add(dataSource, -1L); - } - - try { - log.info("Attempting to close segment %s", segment.getIdentifier()); - oldQueryable.close(); - } - catch (IOException e) { - log.makeAlert(e, "Exception closing segment") - .addData("dataSource", dataSource) - .addData("segmentId", segment.getIdentifier()) - .emit(); - } - } else { - log.info( - "Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.", - dataSource, - segment.getInterval(), - segment.getVersion() - ); - } - } - segmentLoader.cleanup(segment); + this.segmentManager = segmentManager; } @Override @@ -263,7 +121,9 @@ public class ServerManager implements QuerySegmentWalker } String dataSourceName = getDataSourceName(dataSource); - final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); + final VersionedIntervalTimeline timeline = segmentManager.getTimeline( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner(); @@ -352,7 +212,7 @@ public class ServerManager implements QuerySegmentWalker String dataSourceName = getDataSourceName(query.getDataSource()); - final VersionedIntervalTimeline timeline = dataSources.get( + final VersionedIntervalTimeline timeline = segmentManager.getTimeline( dataSourceName ); diff --git a/server/src/main/java/io/druid/server/coordination/ServerType.java b/server/src/main/java/io/druid/server/coordination/ServerType.java new file mode 100644 index 00000000000..39e4d0161c8 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/ServerType.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +public enum ServerType +{ + HISTORICAL, + BRIDGE, + REALTIME { + @Override + public boolean isSegmentReplicationTarget() + { + return false; + } + }; + + /** + * Indicates this type of node is able to be a target of segment replication. + + * @return true if it is available for replication + * + * @see io.druid.server.coordinator.rules.LoadRule + */ + boolean isSegmentReplicationTarget() + { + return true; + } + + /** + * Indicates this type of node is able to be a target of segment broadcast. + * + * @return true if it is available for broadcast. + */ + boolean isSegmentBroadcastTarget() + { + return true; + } + + static ServerType fromString(String type) + { + return ServerType.valueOf(type.toUpperCase()); + } +} diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index baa37403d65..6286e516b61 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -34,6 +34,7 @@ import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.SegmentManager; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; @@ -73,7 +74,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final CuratorFramework curator; private final DataSegmentAnnouncer announcer; private final DataSegmentServerAnnouncer serverAnnouncer; - private final ServerManager serverManager; + private final SegmentManager segmentManager; private final ScheduledExecutorService exec; private final ConcurrentSkipListSet segmentsToDelete; @@ -90,7 +91,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler DataSegmentAnnouncer announcer, DataSegmentServerAnnouncer serverAnnouncer, CuratorFramework curator, - ServerManager serverManager, + SegmentManager segmentManager, ScheduledExecutorFactory factory ) { @@ -101,7 +102,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler this.curator = curator; this.announcer = announcer; this.serverAnnouncer = serverAnnouncer; - this.serverManager = serverManager; + this.segmentManager = segmentManager; this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); this.segmentsToDelete = new ConcurrentSkipListSet<>(); @@ -267,7 +268,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler if (!segment.getIdentifier().equals(file.getName())) { log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getIdentifier()); ignored++; - } else if (serverManager.isSegmentCached(segment)) { + } else if (segmentManager.isSegmentCached(segment)) { cachedSegments.add(segment); } else { log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier()); @@ -319,7 +320,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler { final boolean loaded; try { - loaded = serverManager.loadSegment(segment); + loaded = segmentManager.loadSegment(segment); } catch (Exception e) { removeSegment(segment, callback); @@ -477,7 +478,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler try { synchronized (lock) { if (segmentsToDelete.remove(segment)) { - serverManager.dropSegment(segment); + segmentManager.dropSegment(segment); File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (!segmentInfoCacheFile.delete()) { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java index 75ec66e5f39..5021d3abc9c 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCluster.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCluster.java @@ -19,15 +19,18 @@ package io.druid.server.coordinator; -import com.google.common.collect.Maps; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import io.druid.client.ImmutableDruidServer; +import io.druid.java.util.common.IAE; +import javax.annotation.Nullable; import java.util.Collection; -import java.util.List; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Set; /** * Contains a representation of the current state of the cluster by tier. @@ -35,67 +38,107 @@ import java.util.stream.Collectors; */ public class DruidCluster { - private final Map> cluster; + private final Set realtimes; + private final Map> historicals; public DruidCluster() { - this.cluster = Maps.newHashMap(); + this.realtimes = new HashSet<>(); + this.historicals = new HashMap<>(); } - public DruidCluster(Map> cluster) + @VisibleForTesting + public DruidCluster( + @Nullable Set realtimes, + Map> historicals + ) { - this.cluster = cluster; + this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes); + this.historicals = historicals; } public void add(ServerHolder serverHolder) { - ImmutableDruidServer server = serverHolder.getServer(); - MinMaxPriorityQueue tierServers = cluster.get(server.getTier()); - if (tierServers == null) { - tierServers = MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(); - cluster.put(server.getTier(), tierServers); + switch (serverHolder.getServer().getType()) { + case HISTORICAL: + addHistorical(serverHolder); + break; + case REALTIME: + addRealtime(serverHolder); + break; + case BRIDGE: + addHistorical(serverHolder); + break; + default: + throw new IAE("unknown server type[%s]", serverHolder.getServer().getType()); } + } + + private void addRealtime(ServerHolder serverHolder) + { + realtimes.add(serverHolder); + } + + private void addHistorical(ServerHolder serverHolder) + { + final ImmutableDruidServer server = serverHolder.getServer(); + final MinMaxPriorityQueue tierServers = historicals.computeIfAbsent( + server.getTier(), + k -> MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create() + ); tierServers.add(serverHolder); } - public Map> getCluster() + public Set getRealtimes() { - return cluster; + return realtimes; + } + + public Map> getHistoricals() + { + return historicals; } public Iterable getTierNames() { - return cluster.keySet(); + return historicals.keySet(); } - public MinMaxPriorityQueue getServersByTier(String tier) + public MinMaxPriorityQueue getHistoricalsByTier(String tier) { - return cluster.get(tier); + return historicals.get(tier); } - public List getAllServers() + public Collection getAllServers() { - return cluster.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + return historicals.values().stream() + .flatMap(Collection::stream) + .collect(() -> realtimes, Set::add, Set::addAll); } - public Iterable> getSortedServersByTier() + public Iterable> getSortedHistoricalsByTier() { - return cluster.values(); + return historicals.values(); } public boolean isEmpty() { - return cluster.isEmpty(); + return historicals.isEmpty() && realtimes.isEmpty(); + } + + public boolean hasHistoricals() + { + return !historicals.isEmpty(); + } + + public boolean hasRealtimes() + { + return !realtimes.isEmpty(); } public boolean hasTier(String tier) { - MinMaxPriorityQueue servers = cluster.get(tier); + MinMaxPriorityQueue servers = historicals.get(tier); return (servers == null) || servers.isEmpty(); } - - public MinMaxPriorityQueue get(String tier) - { - return cluster.get(tier); - } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 86636d7eab9..23413af1581 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -751,7 +751,7 @@ public class DruidCoordinator DruidServer input ) { - return input.isAssignable(); + return input.segmentReplicatable(); } } ).transform( diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index 64b76b8fd07..785fce77fee 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -38,7 +38,7 @@ public class SegmentReplicantLookup final Table segmentsInCluster = HashBasedTable.create(); final Table loadingSegments = HashBasedTable.create(); - for (MinMaxPriorityQueue serversByType : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java index 7f2f1369374..12b14fc3cba 100644 --- a/server/src/main/java/io/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/io/druid/server/coordinator/ServerHolder.java @@ -23,6 +23,8 @@ import io.druid.client.ImmutableDruidServer; import io.druid.java.util.common.logger.Logger; import io.druid.timeline.DataSegment; +import java.util.Objects; + /** */ public class ServerHolder implements Comparable @@ -122,21 +124,20 @@ public class ServerHolder implements Comparable ServerHolder that = (ServerHolder) o; - if (peon != null ? !peon.equals(that.peon) : that.peon != null) { - return false; - } - if (server != null ? !server.equals(that.server) : that.server != null) { + if (!this.server.getHost().equals(that.server.getHost())) { return false; } - return true; + if (!this.server.getTier().equals(that.getServer().getTier())) { + return false; + } + + return this.server.getType().equals(that.getServer().getType()); } @Override public int hashCode() { - int result = server != null ? server.hashCode() : 0; - result = 31 * result + (peon != null ? peon.hashCode() : 0); - return result; + return Objects.hash(server.getHost(), server.getTier(), server.getType()); } } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 67e750dd927..d503c1d6a93 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -88,7 +88,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(); for (Map.Entry> entry : - params.getDruidCluster().getCluster().entrySet()) { + params.getDruidCluster().getHistoricals().entrySet()) { String tier = entry.getKey(); if (currentlyMovingSegments.get(tier) == null) { diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java index 68a83c3a919..2819a51ec4c 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -55,7 +55,7 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp DruidCluster cluster = params.getDruidCluster(); Map> timelines = Maps.newHashMap(); - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java index ee4fcc3dd79..89b4deb3175 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java @@ -64,7 +64,7 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper // This is done to prevent a race condition in which the coordinator would drop all segments if it started running // cleanup before it finished polling the metadata storage for available segments for the first time. if (!availableSegments.isEmpty()) { - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java index 55ab2f0a84d..c8b722adb8a 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorLogger.java @@ -84,7 +84,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper for (Map.Entry entry : assigned.entrySet()) { log.info( "[%s] : Assigned %s segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size() + entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() ); } } @@ -99,7 +99,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper for (Map.Entry entry : dropped.entrySet()) { log.info( "[%s] : Dropped %s segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size() + entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() ); } } @@ -152,7 +152,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper for (Map.Entry entry : unneeded.entrySet()) { log.info( "[%s] : Removed %s unneeded segments among %,d servers", - entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size() + entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size() ); } } @@ -187,14 +187,14 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper } } log.info("Load Queues:"); - for (MinMaxPriorityQueue serverHolders : cluster.getSortedServersByTier()) { + for (MinMaxPriorityQueue serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); LoadQueuePeon queuePeon = serverHolder.getPeon(); log.info( "Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d bytes queued, %,d bytes served.", server.getName(), - server.getType(), + server.getType().toString(), server.getTier(), queuePeon.getSegmentsToLoad().size(), queuePeon.getSegmentsToDrop().size(), diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index c260ec23bbc..6d83e4260d7 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -23,7 +23,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; import com.metamx.emitter.EmittingLogger; - import io.druid.java.util.common.IAE; import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; @@ -64,7 +63,7 @@ public abstract class LoadRule implements Rule final int loadedReplicantsInTier = params.getSegmentReplicantLookup() .getLoadedReplicants(segment.getIdentifier(), tier); - final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getServersByTier(tier); + final MinMaxPriorityQueue serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); if (serverQueue == null) { log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit(); continue; @@ -176,8 +175,6 @@ public abstract class LoadRule implements Rule } } - final ReplicationThrottler replicationManager = params.getReplicationManager(); - // Find all instances of this segment across tiers Map replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier()); @@ -188,7 +185,7 @@ public abstract class LoadRule implements Rule stats.addToTieredStat(DROPPED_COUNT, tier, 0); - MinMaxPriorityQueue serverQueue = params.getDruidCluster().get(tier); + MinMaxPriorityQueue serverQueue = params.getDruidCluster().getHistoricalsByTier(tier); if (serverQueue == null) { log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit(); continue; diff --git a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java index cb3bb6b7f06..ec389cf53bb 100644 --- a/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java +++ b/server/src/main/java/io/druid/server/metrics/HistoricalMetricsMonitor.java @@ -26,7 +26,7 @@ import com.metamx.metrics.AbstractMonitor; import io.druid.client.DruidServerConfig; import io.druid.java.util.common.collect.CountingMap; import io.druid.query.DruidMetrics; -import io.druid.server.coordination.ServerManager; +import io.druid.server.SegmentManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.timeline.DataSegment; @@ -35,18 +35,18 @@ import java.util.Map; public class HistoricalMetricsMonitor extends AbstractMonitor { private final DruidServerConfig serverConfig; - private final ServerManager serverManager; + private final SegmentManager segmentManager; private final ZkCoordinator zkCoordinator; @Inject public HistoricalMetricsMonitor( DruidServerConfig serverConfig, - ServerManager serverManager, + SegmentManager segmentManager, ZkCoordinator zkCoordinator ) { this.serverConfig = serverConfig; - this.serverManager = serverManager; + this.segmentManager = segmentManager; this.zkCoordinator = zkCoordinator; } @@ -73,7 +73,7 @@ public class HistoricalMetricsMonitor extends AbstractMonitor ); } - for (Map.Entry entry : serverManager.getDataSourceSizes().entrySet()) { + for (Map.Entry entry : segmentManager.getDataSourceSizes().entrySet()) { String dataSource = entry.getKey(); long used = entry.getValue(); @@ -88,7 +88,7 @@ public class HistoricalMetricsMonitor extends AbstractMonitor emitter.emit(builder.build("segment/usedPercent", usedPercent)); } - for (Map.Entry entry : serverManager.getDataSourceCounts().entrySet()) { + for (Map.Entry entry : segmentManager.getDataSourceCounts().entrySet()) { String dataSource = entry.getKey(); long count = entry.getValue(); final ServiceMetricEvent.Builder builder = diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index 39119987a09..7bc239f359c 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -124,7 +124,7 @@ public class BatchServerInventoryViewTest "id", "host", Long.MAX_VALUE, - "type", + "historical", "tier", 0 ); @@ -443,7 +443,7 @@ public class BatchServerInventoryViewTest "id", "host", Long.MAX_VALUE, - "type", + "historical", "tier", 0 ), diff --git a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java index 26f09e86b8c..c291a272318 100644 --- a/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java +++ b/server/src/test/java/io/druid/client/client/ImmutableSegmentLoadInfoTest.java @@ -51,7 +51,7 @@ public class ImmutableSegmentLoadInfoTest null, NoneShardSpec.instance(), 0, 0 - ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1)) + ), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "historical", "tier", 1)) ); ImmutableSegmentLoadInfo serde = mapper.readValue( diff --git a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java index af02fad8e66..fd0bdd3da72 100644 --- a/server/src/test/java/io/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/io/druid/server/ClientInfoResourceTest.java @@ -79,7 +79,7 @@ public class ClientInfoResourceTest public void setup() { VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); - DruidServer server = new DruidServer("name", "host", 1234, "type", "tier", 0); + DruidServer server = new DruidServer("name", "host", 1234, "historical", "tier", 0); addSegment(timeline, server, "1960-02-13/1961-02-14", ImmutableList.of("d5"), ImmutableList.of("m5"), "v0"); diff --git a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java index 502156e5e90..2deda1ea5f2 100644 --- a/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/io/druid/server/coordination/ServerManagerTest.java @@ -64,6 +64,7 @@ import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; import io.druid.segment.loading.SegmentLoader; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.server.SegmentManager; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -95,6 +96,7 @@ public class ServerManagerTest private CountDownLatch queryWaitYieldLatch; private CountDownLatch queryNotifyLatch; private ExecutorService serverManagerExec; + private SegmentManager segmentManager; @Before public void setUp() throws IOException @@ -106,7 +108,7 @@ public class ServerManagerTest queryNotifyLatch = new CountDownLatch(1); factory = new MyQueryRunnerFactory(queryWaitLatch, queryWaitYieldLatch, queryNotifyLatch); serverManagerExec = Executors.newFixedThreadPool(2); - serverManager = new ServerManager( + segmentManager = new SegmentManager( new SegmentLoader() { @Override @@ -135,7 +137,9 @@ public class ServerManagerTest { } - }, + } + ); + serverManager = new ServerManager( new QueryRunnerFactoryConglomerate() { @Override @@ -149,7 +153,8 @@ public class ServerManagerTest MoreExecutors.sameThreadExecutor(), new DefaultObjectMapper(), new LocalCacheProvider().get(), - new CacheConfig() + new CacheConfig(), + segmentManager ); loadQueryable("test", "1", new Interval("P1d/2011-04-01")); @@ -459,7 +464,7 @@ public class ServerManagerTest public void loadQueryable(String dataSource, String version, Interval interval) throws IOException { try { - serverManager.loadSegment( + segmentManager.loadSegment( new DataSegment( dataSource, interval, @@ -481,7 +486,7 @@ public class ServerManagerTest public void dropQueryable(String dataSource, String version, Interval interval) { try { - serverManager.dropSegment( + segmentManager.dropSegment( new DataSegment( dataSource, interval, diff --git a/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java new file mode 100644 index 00000000000..add988f3f13 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordination/ServerTypeTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import org.junit.Assert; +import org.junit.Test; + +public class ServerTypeTest +{ + @Test + public void testAssignable() + { + Assert.assertTrue(ServerType.HISTORICAL.isSegmentReplicationTarget()); + Assert.assertTrue(ServerType.BRIDGE.isSegmentReplicationTarget()); + Assert.assertFalse(ServerType.REALTIME.isSegmentReplicationTarget()); + } + + @Test + public void testFromString() + { + Assert.assertEquals(ServerType.HISTORICAL, ServerType.fromString("historical")); + Assert.assertEquals(ServerType.BRIDGE, ServerType.fromString("bridge")); + Assert.assertEquals(ServerType.REALTIME, ServerType.fromString("realtime")); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidName() + { + ServerType.fromString("invalid"); + } +} diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index 0ed78d24e2a..1b8db71520e 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -43,6 +43,7 @@ import io.druid.query.NoopQueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; import io.druid.segment.loading.CacheTestSegmentLoader; import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.server.SegmentManager; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.metrics.NoopServiceEmitter; @@ -81,7 +82,7 @@ public class ZkCoordinatorTest extends CuratorTestBase "dummyServer", "dummyHost", 0, - "dummyType", + "historical", "normal", 0 ); @@ -93,6 +94,7 @@ public class ZkCoordinatorTest extends CuratorTestBase private AtomicInteger announceCount; private ConcurrentSkipListSet segmentsAnnouncedByMe; private CacheTestSegmentLoader segmentLoader; + private SegmentManager segmentManager; private List scheduledRunnable; @Before @@ -116,16 +118,17 @@ public class ZkCoordinatorTest extends CuratorTestBase scheduledRunnable = Lists.newArrayList(); segmentLoader = new CacheTestSegmentLoader(); + segmentManager = new SegmentManager(segmentLoader); serverManager = new ServerManager( - segmentLoader, new NoopQueryRunnerFactoryConglomerate(), new NoopServiceEmitter(), MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(), new DefaultObjectMapper(), new LocalCacheProvider().get(), - new CacheConfig() + new CacheConfig(), + segmentManager ); final ZkPathsConfig zkPaths = new ZkPathsConfig() @@ -220,7 +223,7 @@ public class ZkCoordinatorTest extends CuratorTestBase announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), curator, - serverManager, + segmentManager, new ScheduledExecutorFactory() { @Override @@ -388,12 +391,12 @@ public class ZkCoordinatorTest extends CuratorTestBase } checkCache(segments); - Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); zkCoordinator.start(); - Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); + Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(13 * COUNT, announceCount.get()); zkCoordinator.stop(); @@ -512,11 +515,12 @@ public class ZkCoordinatorTest extends CuratorTestBase } ); binder.bind(DruidServerMetadata.class) - .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0)); + .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "historical", "normal", 0)); binder.bind(DataSegmentAnnouncer.class).toInstance(announcer); binder.bind(DataSegmentServerAnnouncer.class).toInstance(EasyMock.createNiceMock(DataSegmentServerAnnouncer.class)); binder.bind(CuratorFramework.class).toInstance(curator); binder.bind(ServerManager.class).toInstance(serverManager); + binder.bind(SegmentManager.class).toInstance(segmentManager); binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle())); } @@ -540,13 +544,13 @@ public class ZkCoordinatorTest extends CuratorTestBase } checkCache(segments); - Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty()); zkCoordinator.start(); - Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty()); + Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty()); for (int i = 0; i < COUNT; ++i) { - Assert.assertEquals(3L, serverManager.getDataSourceCounts().get("test" + i).longValue()); - Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue()); + Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue()); + Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue()); } Assert.assertEquals(5 * COUNT, announceCount.get()); zkCoordinator.stop(); diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index ba6d8e1cd68..a1cc03ceb7e 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -106,7 +106,7 @@ public class BatchDataSegmentAnnouncerTest "id", "host", Long.MAX_VALUE, - "type", + "historical", "tier", 0 ), diff --git a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java index 2b4a098a79c..6accfee6781 100644 --- a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyTest.java @@ -61,7 +61,7 @@ public class CostBalancerStrategyTest serverHolderList.add( new ServerHolder( new ImmutableDruidServer( - new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1), + new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "historical", "hot", 1), 3000L, ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)), ImmutableMap.copyOf(segments) diff --git a/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java index b869c83ae1d..85ff7cf0865 100644 --- a/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java @@ -60,7 +60,7 @@ public class DiskNormalizedCostBalancerStrategyTest serverHolderList.add( new ServerHolder( new ImmutableDruidServer( - new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1), + new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "historical", "hot", 1), 3000L, ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)), ImmutableMap.copyOf(segments) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java b/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java new file mode 100644 index 00000000000..be4d5ada89b --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/DruidClusterTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Ordering; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DruidClusterTest +{ + private static final List segments = ImmutableList.of( + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ), + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ) + ); + + private static final Map dataSources = ImmutableMap.of( + "src1", + new ImmutableDruidDataSource( + "src1", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ), + "src2", + new ImmutableDruidDataSource( + "src2", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ) + ); + + private static final ServerHolder newRealtime = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host2", 100L, ServerType.REALTIME.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + private static final ServerHolder newHistorical = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host2", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + private DruidCluster cluster; + + @Before + public void setup() + { + cluster = new DruidCluster( + ImmutableSet.of( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.REALTIME.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ) + ), + ImmutableMap.of( + "tier1", + MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( + ImmutableList.of( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ) + ) + ) + ) + ); + } + + @Test + public void testAdd() + { + Assert.assertEquals(1, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum()); + Assert.assertEquals(1, cluster.getRealtimes().size()); + + cluster.add(newRealtime); + Assert.assertEquals(1, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum()); + Assert.assertEquals(2, cluster.getRealtimes().size()); + + cluster.add(newHistorical); + Assert.assertEquals(2, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum()); + Assert.assertEquals(2, cluster.getRealtimes().size()); + } + + @Test + public void testGetAllServers() + { + cluster.add(newRealtime); + cluster.add(newHistorical); + final Collection allServers = cluster.getAllServers(); + Assert.assertEquals(4, allServers.size()); + Assert.assertTrue(allServers.containsAll(cluster.getRealtimes())); + Assert.assertTrue( + allServers.containsAll( + cluster.getHistoricals().values().stream().flatMap(Collection::stream).collect(Collectors.toList()) + ) + ); + } + + @Test + public void testIsEmpty() + { + final DruidCluster emptyCluster = new DruidCluster(); + Assert.assertFalse(cluster.isEmpty()); + Assert.assertTrue(emptyCluster.isEmpty()); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java index 523a7b08e3f..afda7460561 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java @@ -134,6 +134,7 @@ public class DruidCoordinatorBalancerProfiler DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -161,6 +162,7 @@ public class DruidCoordinatorBalancerProfiler .withSegmentReplicantLookup( SegmentReplicantLookup.make( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -216,6 +218,7 @@ public class DruidCoordinatorBalancerProfiler DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index 29ca9b4d33c..d699f2369df 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -179,6 +179,7 @@ public class DruidCoordinatorBalancerTest DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -260,6 +261,7 @@ public class DruidCoordinatorBalancerTest DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) @@ -354,6 +356,7 @@ public class DruidCoordinatorBalancerTest DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( new DruidCluster( + null, ImmutableMap.>of( "normal", MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index f82cb5eb850..5d47d8dae14 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -130,6 +130,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -237,6 +238,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -349,6 +351,7 @@ public class DruidCoordinatorRuleRunnerTest } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -430,6 +433,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -489,6 +493,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -562,6 +567,7 @@ public class DruidCoordinatorRuleRunnerTest } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -642,6 +648,7 @@ public class DruidCoordinatorRuleRunnerTest } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -728,6 +735,7 @@ public class DruidCoordinatorRuleRunnerTest } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -815,6 +823,7 @@ public class DruidCoordinatorRuleRunnerTest server2.addDataSegment(segment.getIdentifier(), segment); } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -916,6 +925,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(anotherMockPeon); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -988,6 +998,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -1112,6 +1123,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -1231,6 +1243,7 @@ public class DruidCoordinatorRuleRunnerTest } DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -1317,6 +1330,7 @@ public class DruidCoordinatorRuleRunnerTest EasyMock.replay(databaseRuleManager); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( DruidServer.DEFAULT_TIER, MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 3f4a6b5ba5d..0ca07566194 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -215,7 +215,7 @@ public class DruidCoordinatorTest extends CuratorTestBase EasyMock.replay(metadataRuleManager); EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn( new ImmutableDruidServer( - new DruidServerMetadata("from", null, 5L, null, null, 0), + new DruidServerMetadata("from", null, 5L, "historical", null, 0), 1L, null, ImmutableMap.of("dummySegment", segment) @@ -226,7 +226,7 @@ public class DruidCoordinatorTest extends CuratorTestBase druidServer2 = EasyMock.createMock(DruidServer.class); EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn( new ImmutableDruidServer( - new DruidServerMetadata("to", null, 5L, null, null, 0), + new DruidServerMetadata("to", null, 5L, "historical", null, 0), 1L, null, ImmutableMap.of("dummySegment2", segment) diff --git a/server/src/test/java/io/druid/server/coordinator/ServerHolderTest.java b/server/src/test/java/io/druid/server/coordinator/ServerHolderTest.java new file mode 100644 index 00000000000..666009d2d49 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/ServerHolderTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.ServerType; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class ServerHolderTest +{ + private static final List segments = ImmutableList.of( + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ), + new DataSegment( + "test", + new Interval("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ) + ); + + private static final Map dataSources = ImmutableMap.of( + "src1", + new ImmutableDruidDataSource( + "src1", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ), + "src2", + new ImmutableDruidDataSource( + "src2", + ImmutableMap.of(), + ImmutableMap.of(), + ImmutableSet.of() + ) + ); + + @Test + public void testCompareTo() throws Exception + { + // available size of 100 + final ServerHolder h1 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + // available size of 100 + final ServerHolder h2 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 200L, ServerType.HISTORICAL.name(), "tier1", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + // available size of 10 + final ServerHolder h3 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 1000L, ServerType.HISTORICAL.name(), "tier1", 0), + 990L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + // available size of 50 + final ServerHolder h4 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 50L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + Assert.assertEquals(0, h1.compareTo(h2)); + Assert.assertEquals(-1, h3.compareTo(h1)); + Assert.assertEquals(-1, h3.compareTo(h4)); + } + + @Test + public void testEquals() throws Exception + { + final ServerHolder h1 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h2 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name2", "host1", 200L, ServerType.HISTORICAL.name(), "tier1", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h3 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host2", 200L, ServerType.HISTORICAL.name(), "tier1", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h4 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 200L, ServerType.HISTORICAL.name(), "tier2", 0), + 100L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + final ServerHolder h5 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", 100L, ServerType.REALTIME.name(), "tier1", 0), + 0L, + ImmutableMap.of( + "src1", + dataSources.get("src1") + ), + ImmutableMap.of( + "segment1", + segments.get(0) + ) + ), + new LoadQueuePeonTester() + ); + + Assert.assertEquals(h1, h2); + Assert.assertNotEquals(h1, h3); + Assert.assertNotEquals(h1, h4); + Assert.assertNotEquals(h1, h5); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java index 69396f283a1..9bd095be152 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java @@ -71,6 +71,7 @@ public class DruidCoordinatorCleanupOvershadowedTest availableSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2); druidCluster = new DruidCluster( + null, ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( Collections.singletonList(new ServerHolder(druidServer, mockPeon)) ))); diff --git a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java index 1255a5df5e0..e2a2d9bf52b 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java @@ -188,6 +188,7 @@ public class BroadcastDistributionRuleTest ); druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 33597215f81..4001aea7954 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -158,6 +158,7 @@ public class LoadRuleTest }; DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -282,6 +283,7 @@ public class LoadRuleTest ); server2.addDataSegment(segment.getIdentifier(), segment); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -374,6 +376,7 @@ public class LoadRuleTest }; DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( @@ -482,6 +485,7 @@ public class LoadRuleTest server2.addDataSegment(segment.getIdentifier(), segment); DruidCluster druidCluster = new DruidCluster( + null, ImmutableMap.of( "hot", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create( diff --git a/server/src/test/java/io/druid/server/http/ServersResourceTest.java b/server/src/test/java/io/druid/server/http/ServersResourceTest.java index 0695bf6750b..a8dd7d0a8f8 100644 --- a/server/src/test/java/io/druid/server/http/ServersResourceTest.java +++ b/server/src/test/java/io/druid/server/http/ServersResourceTest.java @@ -41,7 +41,7 @@ public class ServersResourceTest { @Before public void setUp() { - DruidServer dummyServer = new DruidServer("dummy", "host", 1234L, "type", "tier", 0); + DruidServer dummyServer = new DruidServer("dummy", "host", 1234L, "historical", "tier", 0); DataSegment segment = DataSegment.builder() .dataSource("dataSource") .interval(new Interval("2016-03-22T14Z/2016-03-22T15Z")) @@ -65,7 +65,7 @@ public class ServersResourceTest { String result = objectMapper.writeValueAsString(res.getEntity()); String expected = "[{\"host\":\"host\"," + "\"maxSize\":1234," - + "\"type\":\"type\"," + + "\"type\":\"HISTORICAL\"," + "\"tier\":\"tier\"," + "\"priority\":0," + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":" @@ -80,7 +80,7 @@ public class ServersResourceTest { { Response res = serversResource.getClusterServers(null, "simple"); String result = objectMapper.writeValueAsString(res.getEntity()); - String expected = "[{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"type\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}]"; + String expected = "[{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"HISTORICAL\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}]"; Assert.assertEquals(expected, result); } @@ -91,7 +91,7 @@ public class ServersResourceTest { String result = objectMapper.writeValueAsString(res.getEntity()); String expected = "{\"host\":\"host\"," + "\"maxSize\":1234," - + "\"type\":\"type\"," + + "\"type\":\"HISTORICAL\"," + "\"tier\":\"tier\"," + "\"priority\":0," + "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":" @@ -106,7 +106,7 @@ public class ServersResourceTest { { Response res = serversResource.getServer(server.getName(), "simple"); String result = objectMapper.writeValueAsString(res.getEntity()); - String expected = "{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"type\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}"; + String expected = "{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"HISTORICAL\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}"; Assert.assertEquals(expected, result); } diff --git a/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java b/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java index 5e862adea0e..0acc6bd6f08 100644 --- a/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java +++ b/server/src/test/java/io/druid/server/metrics/HistoricalMetricsMonitorTest.java @@ -27,7 +27,7 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.client.DruidServerConfig; -import io.druid.server.coordination.ServerManager; +import io.druid.server.SegmentManager; import io.druid.server.coordination.ZkCoordinator; import io.druid.timeline.DataSegment; import org.easymock.Capture; @@ -47,7 +47,7 @@ import java.util.Map; public class HistoricalMetricsMonitorTest extends EasyMockSupport { private DruidServerConfig druidServerConfig; - private ServerManager serverManager; + private SegmentManager segmentManager; private ZkCoordinator zkCoordinator; private ServiceEmitter serviceEmitter; @@ -55,7 +55,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport public void setUp() { druidServerConfig = EasyMock.createStrictMock(DruidServerConfig.class); - serverManager = EasyMock.createStrictMock(ServerManager.class); + segmentManager = EasyMock.createStrictMock(SegmentManager.class); zkCoordinator = EasyMock.createStrictMock(ZkCoordinator.class); serviceEmitter = EasyMock.createStrictMock(ServiceEmitter.class); } @@ -84,17 +84,17 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport EasyMock.expect(zkCoordinator.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once(); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); - EasyMock.expect(serverManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size)); + EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size)); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).times(2); - EasyMock.expect(serverManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L)); + EasyMock.expect(segmentManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L)); EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once(); EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once(); final HistoricalMetricsMonitor monitor = new HistoricalMetricsMonitor( druidServerConfig, - serverManager, + segmentManager, zkCoordinator ); @@ -102,9 +102,9 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport serviceEmitter.emit(EasyMock.capture(eventCapture)); EasyMock.expectLastCall().times(5); - EasyMock.replay(druidServerConfig, serverManager, zkCoordinator, serviceEmitter); + EasyMock.replay(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter); monitor.doMonitor(serviceEmitter); - EasyMock.verify(druidServerConfig, serverManager, zkCoordinator, serviceEmitter); + EasyMock.verify(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter); final String host = "host"; final String service = "service"; diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index 45861fb65dc..3baaddefa25 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -40,6 +40,7 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierC import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.QueryResource; +import io.druid.server.coordination.ZkCoordinator; import io.druid.server.http.SegmentListerResource; import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.metrics.QueryCountStatsProvider; @@ -109,5 +110,8 @@ public class RealtimeModule implements Module Jerseys.addResource(binder, SegmentListerResource.class); LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, Server.class); + + binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); + LifecycleModule.register(binder, ZkCoordinator.class); } } diff --git a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java index 193cc91c18c..d61a93853fa 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java @@ -51,7 +51,7 @@ public class TestServerInventoryView implements TimelineServerView @Override public void registerSegmentCallback(Executor exec, final SegmentCallback callback) { - final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", 0, "dummy", "dummy", 0); + final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", 0, "historical", "dummy", 0); for (final DataSegment segment : segments) { exec.execute(