Make realtimes available for loading segments (#4148)

* Add ServerType

* Add realtimes to DruidCluster

* fix test fails

* Add SegmentManager

* Fix equals and hashCode of ServerHolder

* Address comments and add more tests

* Address comments
This commit is contained in:
Jihoon Son 2017-05-19 00:03:39 +09:00 committed by Roman Leventov
parent 733dfc9b30
commit 5c0a7ad2f8
44 changed files with 988 additions and 267 deletions

View File

@ -272,7 +272,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (Pair<ServerSelector, SegmentDescriptor> p : segments) {
if (!p.lhs.pick().getServer().isAssignable()) {
if (!p.lhs.pick().getServer().segmentReplicatable()) {
hasOnlyHistoricalSegments = false;
break;
}
@ -429,7 +429,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
final Sequence<T> resultSeqToAdd;
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
if (!server.segmentReplicatable() || !populateCache || isBySegment) { // Direct server queryable
if (!isBySegment) {
resultSeqToAdd = clientQueryable.run(queryPlus.withQuerySegmentSpec(segmentSpec), responseContext);
} else {

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Maps;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import java.util.Collections;
@ -111,7 +112,7 @@ public class DruidServer implements Comparable
return metadata.getMaxSize();
}
public String getType()
public ServerType getType()
{
return metadata.getType();
}
@ -121,9 +122,9 @@ public class DruidServer implements Comparable
return metadata.getTier();
}
public boolean isAssignable()
public boolean segmentReplicatable()
{
return metadata.isAssignable();
return metadata.segmentReplicatable();
}
public int getPriority()

View File

@ -72,4 +72,15 @@ public class ImmutableDruidDataSource
{
return segmentsHolder;
}
@Override
public String toString()
{
// partitionNames is intentionally ignored because it is usually large
return "ImmutableDruidDataSource{"
+ "name='" + name
+ "', segments='" + segmentsHolder
+ "', properties='" + properties
+ "'}";
}
}

View File

@ -21,6 +21,7 @@ package io.druid.client;
import com.google.common.collect.ImmutableMap;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import java.util.Map;
@ -72,7 +73,7 @@ public class ImmutableDruidServer
return metadata.getMaxSize();
}
public String getType()
public ServerType getType()
{
return metadata.getType();
}
@ -106,4 +107,15 @@ public class ImmutableDruidServer
{
return segments;
}
@Override
public String toString()
{
// segments is intentionally ignored because it is usually large
return "ImmutableDruidServer{"
+ "meta='" + metadata
+ "', size='" + currSize
+ "', sources='" + dataSources
+ "'}";
}
}

View File

@ -20,6 +20,7 @@
package io.druid.segment.realtime;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
@ -84,6 +85,7 @@ public class RealtimeManager implements QuerySegmentWalker
this(fireDepartments, conglomerate, serverAnnouncer, Maps.newHashMap());
}
@VisibleForTesting
RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate,
@ -94,7 +96,7 @@ public class RealtimeManager implements QuerySegmentWalker
this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate;
this.serverAnnouncer = serverAnnouncer;
this.chiefs = chiefs;
this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs);
}
@LifecycleStart

View File

@ -148,7 +148,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
@Override
public boolean apply(DruidServerMetadata input)
{
return input.isAssignable();
return input.segmentReplicatable();
}
}
)) {

View File

@ -0,0 +1,196 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.collections.CountingMap;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class SegmentManager
{
private static final EmittingLogger log = new EmittingLogger(SegmentManager.class);
private final Object lock = new Object();
private final SegmentLoader segmentLoader;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources = new HashMap<>();
private final CountingMap<String> dataSourceSizes = new CountingMap<>();
private final CountingMap<String> dataSourceCounts = new CountingMap<>();
@Inject
public SegmentManager(
SegmentLoader segmentLoader
)
{
this.segmentLoader = segmentLoader;
}
public Map<String, Long> getDataSourceSizes()
{
synchronized (dataSourceSizes) {
return dataSourceSizes.snapshot();
}
}
public Map<String, Long> getDataSourceCounts()
{
synchronized (dataSourceCounts) {
return dataSourceCounts.snapshot();
}
}
public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException
{
return segmentLoader.isSegmentLoaded(segment);
}
public VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimeline(String dataSource)
{
synchronized (lock) {
return dataSources.get(dataSource);
}
}
/**
* Load a single segment.
*
* @param segment segment to load
*
* @return true if the segment was newly loaded, false if it was already loaded
*
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{
final Segment adapter = getAdapter(segment);
synchronized (lock) {
final String dataSource = segment.getDataSource();
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.computeIfAbsent(
dataSource,
k -> new VersionedIntervalTimeline<>(Ordering.natural())
);
final PartitionHolder<ReferenceCountingSegment> entry = loadedIntervals.findEntry(
segment.getInterval(),
segment.getVersion()
);
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
return false;
}
loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
);
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L);
}
return true;
}
}
private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException
{
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment);
}
catch (SegmentLoadingException e) {
try {
segmentLoader.cleanup(segment);
}
catch (SegmentLoadingException e1) {
e.addSuppressed(e1);
}
throw e;
}
if (adapter == null) {
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
}
return adapter;
}
public void dropSegment(final DataSegment segment) throws SegmentLoadingException
{
String dataSource = segment.getDataSource();
synchronized (lock) {
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) {
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource);
return;
}
PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(null)
);
ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, -segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, -1L);
}
try {
log.info("Attempting to close segment %s", segment.getIdentifier());
oldQueryable.close();
}
catch (IOException e) {
log.makeAlert(e, "Exception closing segment")
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
dataSource,
segment.getInterval(),
segment.getVersion()
);
}
}
segmentLoader.cleanup(segment);
}
}

View File

@ -310,7 +310,7 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
return makeServedSegmentPath(
UUIDUtils.generateUuid(
server.getHost(),
server.getType(),
server.getType().toString(),
server.getTier(),
new DateTime().toString()
)

View File

@ -30,7 +30,7 @@ public class DruidServerMetadata
private final String host;
private final long maxSize;
private final String tier;
private final String type;
private final ServerType type;
private final int priority;
@JsonCreator
@ -47,7 +47,7 @@ public class DruidServerMetadata
this.host = host;
this.maxSize = maxSize;
this.tier = tier;
this.type = type;
this.type = ServerType.fromString(type);
this.priority = priority;
}
@ -76,7 +76,7 @@ public class DruidServerMetadata
}
@JsonProperty
public String getType()
public ServerType getType()
{
return type;
}
@ -87,9 +87,9 @@ public class DruidServerMetadata
return priority;
}
public boolean isAssignable()
public boolean segmentReplicatable()
{
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
return type.isSegmentReplicationTarget();
}
@Override

View File

@ -22,14 +22,12 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.collections.CountingMap;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.Smile;
@ -55,10 +53,7 @@ import io.druid.query.TableDataSource;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.server.SegmentManager;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
@ -66,11 +61,8 @@ import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
@ -79,32 +71,27 @@ import java.util.concurrent.atomic.AtomicLong;
public class ServerManager implements QuerySegmentWalker
{
private static final EmittingLogger log = new EmittingLogger(ServerManager.class);
private final Object lock = new Object();
private final SegmentLoader segmentLoader;
private final QueryRunnerFactoryConglomerate conglomerate;
private final ServiceEmitter emitter;
private final ExecutorService exec;
private final ExecutorService cachingExec;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources;
private final CountingMap<String> dataSourceSizes = new CountingMap<String>();
private final CountingMap<String> dataSourceCounts = new CountingMap<String>();
private final Cache cache;
private final ObjectMapper objectMapper;
private final CacheConfig cacheConfig;
private final SegmentManager segmentManager;
@Inject
public ServerManager(
SegmentLoader segmentLoader,
QueryRunnerFactoryConglomerate conglomerate,
ServiceEmitter emitter,
@Processing ExecutorService exec,
@BackgroundCaching ExecutorService cachingExec,
@Smile ObjectMapper objectMapper,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
SegmentManager segmentManager
)
{
this.segmentLoader = segmentLoader;
this.conglomerate = conglomerate;
this.emitter = emitter;
@ -113,137 +100,8 @@ public class ServerManager implements QuerySegmentWalker
this.cache = cache;
this.objectMapper = objectMapper;
this.dataSources = new HashMap<>();
this.cacheConfig = cacheConfig;
}
public Map<String, Long> getDataSourceSizes()
{
synchronized (dataSourceSizes) {
return dataSourceSizes.snapshot();
}
}
public Map<String, Long> getDataSourceCounts()
{
synchronized (dataSourceCounts) {
return dataSourceCounts.snapshot();
}
}
public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException
{
return segmentLoader.isSegmentLoaded(segment);
}
/**
* Load a single segment.
*
* @param segment segment to load
*
* @return true if the segment was newly loaded, false if it was already loaded
*
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment);
}
catch (SegmentLoadingException e) {
try {
segmentLoader.cleanup(segment);
}
catch (SegmentLoadingException e1) {
// ignore
}
throw e;
}
if (adapter == null) {
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
}
synchronized (lock) {
String dataSource = segment.getDataSource();
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) {
loadedIntervals = new VersionedIntervalTimeline<>(Ordering.natural());
dataSources.put(dataSource, loadedIntervals);
}
PartitionHolder<ReferenceCountingSegment> entry = loadedIntervals.findEntry(
segment.getInterval(),
segment.getVersion()
);
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
return false;
}
loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
);
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L);
}
return true;
}
}
public void dropSegment(final DataSegment segment) throws SegmentLoadingException
{
String dataSource = segment.getDataSource();
synchronized (lock) {
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);
if (loadedIntervals == null) {
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource);
return;
}
PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk((ReferenceCountingSegment) null)
);
ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
if (oldQueryable != null) {
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, -segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, -1L);
}
try {
log.info("Attempting to close segment %s", segment.getIdentifier());
oldQueryable.close();
}
catch (IOException e) {
log.makeAlert(e, "Exception closing segment")
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
dataSource,
segment.getInterval(),
segment.getVersion()
);
}
}
segmentLoader.cleanup(segment);
this.segmentManager = segmentManager;
}
@Override
@ -263,7 +121,9 @@ public class ServerManager implements QuerySegmentWalker
}
String dataSourceName = getDataSourceName(dataSource);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = segmentManager.getTimeline(
dataSourceName
);
if (timeline == null) {
return new NoopQueryRunner<T>();
@ -352,7 +212,7 @@ public class ServerManager implements QuerySegmentWalker
String dataSourceName = getDataSourceName(query.getDataSource());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = segmentManager.getTimeline(
dataSourceName
);

View File

@ -0,0 +1,60 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
public enum ServerType
{
HISTORICAL,
BRIDGE,
REALTIME {
@Override
public boolean isSegmentReplicationTarget()
{
return false;
}
};
/**
* Indicates this type of node is able to be a target of segment replication.
* @return true if it is available for replication
*
* @see io.druid.server.coordinator.rules.LoadRule
*/
boolean isSegmentReplicationTarget()
{
return true;
}
/**
* Indicates this type of node is able to be a target of segment broadcast.
*
* @return true if it is available for broadcast.
*/
boolean isSegmentBroadcastTarget()
{
return true;
}
static ServerType fromString(String type)
{
return ServerType.valueOf(type.toUpperCase());
}
}

View File

@ -34,6 +34,7 @@ import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.SegmentManager;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
@ -73,7 +74,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final CuratorFramework curator;
private final DataSegmentAnnouncer announcer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final ServerManager serverManager;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
@ -90,7 +91,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
CuratorFramework curator,
ServerManager serverManager,
SegmentManager segmentManager,
ScheduledExecutorFactory factory
)
{
@ -101,7 +102,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
this.curator = curator;
this.announcer = announcer;
this.serverAnnouncer = serverAnnouncer;
this.serverManager = serverManager;
this.segmentManager = segmentManager;
this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
this.segmentsToDelete = new ConcurrentSkipListSet<>();
@ -267,7 +268,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
if (!segment.getIdentifier().equals(file.getName())) {
log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getIdentifier());
ignored++;
} else if (serverManager.isSegmentCached(segment)) {
} else if (segmentManager.isSegmentCached(segment)) {
cachedSegments.add(segment);
} else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
@ -319,7 +320,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{
final boolean loaded;
try {
loaded = serverManager.loadSegment(segment);
loaded = segmentManager.loadSegment(segment);
}
catch (Exception e) {
removeSegment(segment, callback);
@ -477,7 +478,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try {
synchronized (lock) {
if (segmentsToDelete.remove(segment)) {
serverManager.dropSegment(segment);
segmentManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {

View File

@ -19,15 +19,18 @@
package io.druid.server.coordinator;
import com.google.common.collect.Maps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;
/**
* Contains a representation of the current state of the cluster by tier.
@ -35,67 +38,107 @@ import java.util.stream.Collectors;
*/
public class DruidCluster
{
private final Map<String, MinMaxPriorityQueue<ServerHolder>> cluster;
private final Set<ServerHolder> realtimes;
private final Map<String, MinMaxPriorityQueue<ServerHolder>> historicals;
public DruidCluster()
{
this.cluster = Maps.newHashMap();
this.realtimes = new HashSet<>();
this.historicals = new HashMap<>();
}
public DruidCluster(Map<String, MinMaxPriorityQueue<ServerHolder>> cluster)
@VisibleForTesting
public DruidCluster(
@Nullable Set<ServerHolder> realtimes,
Map<String, MinMaxPriorityQueue<ServerHolder>> historicals
)
{
this.cluster = cluster;
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
this.historicals = historicals;
}
public void add(ServerHolder serverHolder)
{
ImmutableDruidServer server = serverHolder.getServer();
MinMaxPriorityQueue<ServerHolder> tierServers = cluster.get(server.getTier());
if (tierServers == null) {
tierServers = MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create();
cluster.put(server.getTier(), tierServers);
switch (serverHolder.getServer().getType()) {
case HISTORICAL:
addHistorical(serverHolder);
break;
case REALTIME:
addRealtime(serverHolder);
break;
case BRIDGE:
addHistorical(serverHolder);
break;
default:
throw new IAE("unknown server type[%s]", serverHolder.getServer().getType());
}
}
private void addRealtime(ServerHolder serverHolder)
{
realtimes.add(serverHolder);
}
private void addHistorical(ServerHolder serverHolder)
{
final ImmutableDruidServer server = serverHolder.getServer();
final MinMaxPriorityQueue<ServerHolder> tierServers = historicals.computeIfAbsent(
server.getTier(),
k -> MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create()
);
tierServers.add(serverHolder);
}
public Map<String, MinMaxPriorityQueue<ServerHolder>> getCluster()
public Set<ServerHolder> getRealtimes()
{
return cluster;
return realtimes;
}
public Map<String, MinMaxPriorityQueue<ServerHolder>> getHistoricals()
{
return historicals;
}
public Iterable<String> getTierNames()
{
return cluster.keySet();
return historicals.keySet();
}
public MinMaxPriorityQueue<ServerHolder> getServersByTier(String tier)
public MinMaxPriorityQueue<ServerHolder> getHistoricalsByTier(String tier)
{
return cluster.get(tier);
return historicals.get(tier);
}
public List<ServerHolder> getAllServers()
public Collection<ServerHolder> getAllServers()
{
return cluster.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
return historicals.values().stream()
.flatMap(Collection::stream)
.collect(() -> realtimes, Set::add, Set::addAll);
}
public Iterable<MinMaxPriorityQueue<ServerHolder>> getSortedServersByTier()
public Iterable<MinMaxPriorityQueue<ServerHolder>> getSortedHistoricalsByTier()
{
return cluster.values();
return historicals.values();
}
public boolean isEmpty()
{
return cluster.isEmpty();
return historicals.isEmpty() && realtimes.isEmpty();
}
public boolean hasHistoricals()
{
return !historicals.isEmpty();
}
public boolean hasRealtimes()
{
return !realtimes.isEmpty();
}
public boolean hasTier(String tier)
{
MinMaxPriorityQueue<ServerHolder> servers = cluster.get(tier);
MinMaxPriorityQueue<ServerHolder> servers = historicals.get(tier);
return (servers == null) || servers.isEmpty();
}
public MinMaxPriorityQueue<ServerHolder> get(String tier)
{
return cluster.get(tier);
}
}

View File

@ -751,7 +751,7 @@ public class DruidCoordinator
DruidServer input
)
{
return input.isAssignable();
return input.segmentReplicatable();
}
}
).transform(

View File

@ -38,7 +38,7 @@ public class SegmentReplicantLookup
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();
for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedServersByTier()) {
for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
ImmutableDruidServer server = serverHolder.getServer();

View File

@ -23,6 +23,8 @@ import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
import java.util.Objects;
/**
*/
public class ServerHolder implements Comparable<ServerHolder>
@ -122,21 +124,20 @@ public class ServerHolder implements Comparable<ServerHolder>
ServerHolder that = (ServerHolder) o;
if (peon != null ? !peon.equals(that.peon) : that.peon != null) {
return false;
}
if (server != null ? !server.equals(that.server) : that.server != null) {
if (!this.server.getHost().equals(that.server.getHost())) {
return false;
}
return true;
if (!this.server.getTier().equals(that.getServer().getTier())) {
return false;
}
return this.server.getType().equals(that.getServer().getType());
}
@Override
public int hashCode()
{
int result = server != null ? server.hashCode() : 0;
result = 31 * result + (peon != null ? peon.hashCode() : 0);
return result;
return Objects.hash(server.getHost(), server.getTier(), server.getType());
}
}

View File

@ -88,7 +88,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry :
params.getDruidCluster().getCluster().entrySet()) {
params.getDruidCluster().getHistoricals().entrySet()) {
String tier = entry.getKey();
if (currentlyMovingSegments.get(tier) == null) {

View File

@ -55,7 +55,7 @@ public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelp
DruidCluster cluster = params.getDruidCluster();
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = Maps.newHashMap();
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

View File

@ -64,7 +64,7 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
// This is done to prevent a race condition in which the coordinator would drop all segments if it started running
// cleanup before it finished polling the metadata storage for available segments for the first time.
if (!availableSegments.isEmpty()) {
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

View File

@ -84,7 +84,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
for (Map.Entry<String, AtomicLong> entry : assigned.entrySet()) {
log.info(
"[%s] : Assigned %s segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size()
entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size()
);
}
}
@ -99,7 +99,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
for (Map.Entry<String, AtomicLong> entry : dropped.entrySet()) {
log.info(
"[%s] : Dropped %s segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size()
entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size()
);
}
}
@ -152,7 +152,7 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
for (Map.Entry<String, AtomicLong> entry : unneeded.entrySet()) {
log.info(
"[%s] : Removed %s unneeded segments among %,d servers",
entry.getKey(), entry.getValue().get(), cluster.get(entry.getKey()).size()
entry.getKey(), entry.getValue().get(), cluster.getHistoricalsByTier(entry.getKey()).size()
);
}
}
@ -187,14 +187,14 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
}
}
log.info("Load Queues:");
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedServersByTier()) {
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();
LoadQueuePeon queuePeon = serverHolder.getPeon();
log.info(
"Server[%s, %s, %s] has %,d left to load, %,d left to drop, %,d bytes queued, %,d bytes served.",
server.getName(),
server.getType(),
server.getType().toString(),
server.getTier(),
queuePeon.getSegmentsToLoad().size(),
queuePeon.getSegmentsToDrop().size(),

View File

@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.IAE;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
@ -64,7 +63,7 @@ public abstract class LoadRule implements Rule
final int loadedReplicantsInTier = params.getSegmentReplicantLookup()
.getLoadedReplicants(segment.getIdentifier(), tier);
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
final MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
continue;
@ -176,8 +175,6 @@ public abstract class LoadRule implements Rule
}
}
final ReplicationThrottler replicationManager = params.getReplicationManager();
// Find all instances of this segment across tiers
Map<String, Integer> replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
@ -188,7 +185,7 @@ public abstract class LoadRule implements Rule
stats.addToTieredStat(DROPPED_COUNT, tier, 0);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier);
if (serverQueue == null) {
log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
continue;

View File

@ -26,7 +26,7 @@ import com.metamx.metrics.AbstractMonitor;
import io.druid.client.DruidServerConfig;
import io.druid.java.util.common.collect.CountingMap;
import io.druid.query.DruidMetrics;
import io.druid.server.coordination.ServerManager;
import io.druid.server.SegmentManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.timeline.DataSegment;
@ -35,18 +35,18 @@ import java.util.Map;
public class HistoricalMetricsMonitor extends AbstractMonitor
{
private final DruidServerConfig serverConfig;
private final ServerManager serverManager;
private final SegmentManager segmentManager;
private final ZkCoordinator zkCoordinator;
@Inject
public HistoricalMetricsMonitor(
DruidServerConfig serverConfig,
ServerManager serverManager,
SegmentManager segmentManager,
ZkCoordinator zkCoordinator
)
{
this.serverConfig = serverConfig;
this.serverManager = serverManager;
this.segmentManager = segmentManager;
this.zkCoordinator = zkCoordinator;
}
@ -73,7 +73,7 @@ public class HistoricalMetricsMonitor extends AbstractMonitor
);
}
for (Map.Entry<String, Long> entry : serverManager.getDataSourceSizes().entrySet()) {
for (Map.Entry<String, Long> entry : segmentManager.getDataSourceSizes().entrySet()) {
String dataSource = entry.getKey();
long used = entry.getValue();
@ -88,7 +88,7 @@ public class HistoricalMetricsMonitor extends AbstractMonitor
emitter.emit(builder.build("segment/usedPercent", usedPercent));
}
for (Map.Entry<String, Long> entry : serverManager.getDataSourceCounts().entrySet()) {
for (Map.Entry<String, Long> entry : segmentManager.getDataSourceCounts().entrySet()) {
String dataSource = entry.getKey();
long count = entry.getValue();
final ServiceMetricEvent.Builder builder =

View File

@ -124,7 +124,7 @@ public class BatchServerInventoryViewTest
"id",
"host",
Long.MAX_VALUE,
"type",
"historical",
"tier",
0
);
@ -443,7 +443,7 @@ public class BatchServerInventoryViewTest
"id",
"host",
Long.MAX_VALUE,
"type",
"historical",
"tier",
0
),

View File

@ -51,7 +51,7 @@ public class ImmutableSegmentLoadInfoTest
null,
NoneShardSpec.instance(),
0, 0
), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "type", "tier", 1))
), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "historical", "tier", 1))
);
ImmutableSegmentLoadInfo serde = mapper.readValue(

View File

@ -79,7 +79,7 @@ public class ClientInfoResourceTest
public void setup()
{
VersionedIntervalTimeline<String, ServerSelector> timeline = new VersionedIntervalTimeline<>(Ordering.<String>natural());
DruidServer server = new DruidServer("name", "host", 1234, "type", "tier", 0);
DruidServer server = new DruidServer("name", "host", 1234, "historical", "tier", 0);
addSegment(timeline, server, "1960-02-13/1961-02-14", ImmutableList.of("d5"), ImmutableList.of("m5"), "v0");

View File

@ -64,6 +64,7 @@ import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.SegmentManager;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
@ -95,6 +96,7 @@ public class ServerManagerTest
private CountDownLatch queryWaitYieldLatch;
private CountDownLatch queryNotifyLatch;
private ExecutorService serverManagerExec;
private SegmentManager segmentManager;
@Before
public void setUp() throws IOException
@ -106,7 +108,7 @@ public class ServerManagerTest
queryNotifyLatch = new CountDownLatch(1);
factory = new MyQueryRunnerFactory(queryWaitLatch, queryWaitYieldLatch, queryNotifyLatch);
serverManagerExec = Executors.newFixedThreadPool(2);
serverManager = new ServerManager(
segmentManager = new SegmentManager(
new SegmentLoader()
{
@Override
@ -135,7 +137,9 @@ public class ServerManagerTest
{
}
},
}
);
serverManager = new ServerManager(
new QueryRunnerFactoryConglomerate()
{
@Override
@ -149,7 +153,8 @@ public class ServerManagerTest
MoreExecutors.sameThreadExecutor(),
new DefaultObjectMapper(),
new LocalCacheProvider().get(),
new CacheConfig()
new CacheConfig(),
segmentManager
);
loadQueryable("test", "1", new Interval("P1d/2011-04-01"));
@ -459,7 +464,7 @@ public class ServerManagerTest
public void loadQueryable(String dataSource, String version, Interval interval) throws IOException
{
try {
serverManager.loadSegment(
segmentManager.loadSegment(
new DataSegment(
dataSource,
interval,
@ -481,7 +486,7 @@ public class ServerManagerTest
public void dropQueryable(String dataSource, String version, Interval interval)
{
try {
serverManager.dropSegment(
segmentManager.dropSegment(
new DataSegment(
dataSource,
interval,

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import org.junit.Assert;
import org.junit.Test;
public class ServerTypeTest
{
@Test
public void testAssignable()
{
Assert.assertTrue(ServerType.HISTORICAL.isSegmentReplicationTarget());
Assert.assertTrue(ServerType.BRIDGE.isSegmentReplicationTarget());
Assert.assertFalse(ServerType.REALTIME.isSegmentReplicationTarget());
}
@Test
public void testFromString()
{
Assert.assertEquals(ServerType.HISTORICAL, ServerType.fromString("historical"));
Assert.assertEquals(ServerType.BRIDGE, ServerType.fromString("bridge"));
Assert.assertEquals(ServerType.REALTIME, ServerType.fromString("realtime"));
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidName()
{
ServerType.fromString("invalid");
}
}

View File

@ -43,6 +43,7 @@ import io.druid.query.NoopQueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.loading.CacheTestSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.SegmentManager;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
@ -81,7 +82,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
"dummyServer",
"dummyHost",
0,
"dummyType",
"historical",
"normal",
0
);
@ -93,6 +94,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
private AtomicInteger announceCount;
private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
private CacheTestSegmentLoader segmentLoader;
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
@Before
@ -116,16 +118,17 @@ public class ZkCoordinatorTest extends CuratorTestBase
scheduledRunnable = Lists.newArrayList();
segmentLoader = new CacheTestSegmentLoader();
segmentManager = new SegmentManager(segmentLoader);
serverManager = new ServerManager(
segmentLoader,
new NoopQueryRunnerFactoryConglomerate(),
new NoopServiceEmitter(),
MoreExecutors.sameThreadExecutor(),
MoreExecutors.sameThreadExecutor(),
new DefaultObjectMapper(),
new LocalCacheProvider().get(),
new CacheConfig()
new CacheConfig(),
segmentManager
);
final ZkPathsConfig zkPaths = new ZkPathsConfig()
@ -220,7 +223,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
curator,
serverManager,
segmentManager,
new ScheduledExecutorFactory()
{
@Override
@ -388,12 +391,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
}
checkCache(segments);
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
zkCoordinator.start();
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(11L, serverManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(13 * COUNT, announceCount.get());
zkCoordinator.stop();
@ -512,11 +515,12 @@ public class ZkCoordinatorTest extends CuratorTestBase
}
);
binder.bind(DruidServerMetadata.class)
.toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0));
.toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "historical", "normal", 0));
binder.bind(DataSegmentAnnouncer.class).toInstance(announcer);
binder.bind(DataSegmentServerAnnouncer.class).toInstance(EasyMock.createNiceMock(DataSegmentServerAnnouncer.class));
binder.bind(CuratorFramework.class).toInstance(curator);
binder.bind(ServerManager.class).toInstance(serverManager);
binder.bind(SegmentManager.class).toInstance(segmentManager);
binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle()));
}
@ -540,13 +544,13 @@ public class ZkCoordinatorTest extends CuratorTestBase
}
checkCache(segments);
Assert.assertTrue(serverManager.getDataSourceCounts().isEmpty());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
zkCoordinator.start();
Assert.assertTrue(!serverManager.getDataSourceCounts().isEmpty());
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(3L, serverManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, serverManager.getDataSourceCounts().get("test_two" + i).longValue());
Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(5 * COUNT, announceCount.get());
zkCoordinator.stop();

View File

@ -106,7 +106,7 @@ public class BatchDataSegmentAnnouncerTest
"id",
"host",
Long.MAX_VALUE,
"type",
"historical",
"tier",
0
),

View File

@ -61,7 +61,7 @@ public class CostBalancerStrategyTest
serverHolderList.add(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1),
new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "historical", "hot", 1),
3000L,
ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)),
ImmutableMap.copyOf(segments)

View File

@ -60,7 +60,7 @@ public class DiskNormalizedCostBalancerStrategyTest
serverHolderList.add(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1),
new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "historical", "hot", 1),
3000L,
ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)),
ImmutableMap.copyOf(segments)

View File

@ -0,0 +1,204 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DruidClusterTest
{
private static final List<DataSegment> segments = ImmutableList.of(
new DataSegment(
"test",
new Interval("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"),
null,
null,
NoneShardSpec.instance(),
0,
1
),
new DataSegment(
"test",
new Interval("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"),
null,
null,
NoneShardSpec.instance(),
0,
1
)
);
private static final Map<String, ImmutableDruidDataSource> dataSources = ImmutableMap.of(
"src1",
new ImmutableDruidDataSource(
"src1",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
),
"src2",
new ImmutableDruidDataSource(
"src2",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
)
);
private static final ServerHolder newRealtime = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host2", 100L, ServerType.REALTIME.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
private static final ServerHolder newHistorical = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host2", 100L, ServerType.HISTORICAL.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
private DruidCluster cluster;
@Before
public void setup()
{
cluster = new DruidCluster(
ImmutableSet.of(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 100L, ServerType.REALTIME.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
)
),
ImmutableMap.of(
"tier1",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
ImmutableList.of(
new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
)
)
)
)
);
}
@Test
public void testAdd()
{
Assert.assertEquals(1, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum());
Assert.assertEquals(1, cluster.getRealtimes().size());
cluster.add(newRealtime);
Assert.assertEquals(1, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum());
Assert.assertEquals(2, cluster.getRealtimes().size());
cluster.add(newHistorical);
Assert.assertEquals(2, cluster.getHistoricals().values().stream().mapToInt(Collection::size).sum());
Assert.assertEquals(2, cluster.getRealtimes().size());
}
@Test
public void testGetAllServers()
{
cluster.add(newRealtime);
cluster.add(newHistorical);
final Collection<ServerHolder> allServers = cluster.getAllServers();
Assert.assertEquals(4, allServers.size());
Assert.assertTrue(allServers.containsAll(cluster.getRealtimes()));
Assert.assertTrue(
allServers.containsAll(
cluster.getHistoricals().values().stream().flatMap(Collection::stream).collect(Collectors.toList())
)
);
}
@Test
public void testIsEmpty()
{
final DruidCluster emptyCluster = new DruidCluster();
Assert.assertFalse(cluster.isEmpty());
Assert.assertTrue(emptyCluster.isEmpty());
}
}

View File

@ -134,6 +134,7 @@ public class DruidCoordinatorBalancerProfiler
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
@ -161,6 +162,7 @@ public class DruidCoordinatorBalancerProfiler
.withSegmentReplicantLookup(
SegmentReplicantLookup.make(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
@ -216,6 +218,7 @@ public class DruidCoordinatorBalancerProfiler
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)

View File

@ -179,6 +179,7 @@ public class DruidCoordinatorBalancerTest
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
@ -260,6 +261,7 @@ public class DruidCoordinatorBalancerTest
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)
@ -354,6 +356,7 @@ public class DruidCoordinatorBalancerTest
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(
new DruidCluster(
null,
ImmutableMap.<String, MinMaxPriorityQueue<ServerHolder>>of(
"normal",
MinMaxPriorityQueue.orderedBy(DruidCoordinatorBalancerTester.percentUsedComparator)

View File

@ -130,6 +130,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -237,6 +238,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -349,6 +351,7 @@ public class DruidCoordinatorRuleRunnerTest
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -430,6 +433,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -489,6 +493,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -562,6 +567,7 @@ public class DruidCoordinatorRuleRunnerTest
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -642,6 +648,7 @@ public class DruidCoordinatorRuleRunnerTest
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -728,6 +735,7 @@ public class DruidCoordinatorRuleRunnerTest
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -815,6 +823,7 @@ public class DruidCoordinatorRuleRunnerTest
server2.addDataSegment(segment.getIdentifier(), segment);
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -916,6 +925,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(anotherMockPeon);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -988,6 +998,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -1112,6 +1123,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -1231,6 +1243,7 @@ public class DruidCoordinatorRuleRunnerTest
}
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"normal",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -1317,6 +1330,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.replay(databaseRuleManager);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(

View File

@ -215,7 +215,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
EasyMock.replay(metadataRuleManager);
EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
new DruidServerMetadata("from", null, 5L, null, null, 0),
new DruidServerMetadata("from", null, 5L, "historical", null, 0),
1L,
null,
ImmutableMap.of("dummySegment", segment)
@ -226,7 +226,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
druidServer2 = EasyMock.createMock(DruidServer.class);
EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
new DruidServerMetadata("to", null, 5L, null, null, 0),
new DruidServerMetadata("to", null, 5L, "historical", null, 0),
1L,
null,
ImmutableMap.of("dummySegment2", segment)

View File

@ -0,0 +1,246 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
public class ServerHolderTest
{
private static final List<DataSegment> segments = ImmutableList.of(
new DataSegment(
"test",
new Interval("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"),
null,
null,
NoneShardSpec.instance(),
0,
1
),
new DataSegment(
"test",
new Interval("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"),
null,
null,
NoneShardSpec.instance(),
0,
1
)
);
private static final Map<String, ImmutableDruidDataSource> dataSources = ImmutableMap.of(
"src1",
new ImmutableDruidDataSource(
"src1",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
),
"src2",
new ImmutableDruidDataSource(
"src2",
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableSet.of()
)
);
@Test
public void testCompareTo() throws Exception
{
// available size of 100
final ServerHolder h1 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
// available size of 100
final ServerHolder h2 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 200L, ServerType.HISTORICAL.name(), "tier1", 0),
100L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
// available size of 10
final ServerHolder h3 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 1000L, ServerType.HISTORICAL.name(), "tier1", 0),
990L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
// available size of 50
final ServerHolder h4 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 50L, ServerType.HISTORICAL.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
Assert.assertEquals(0, h1.compareTo(h2));
Assert.assertEquals(-1, h3.compareTo(h1));
Assert.assertEquals(-1, h3.compareTo(h4));
}
@Test
public void testEquals() throws Exception
{
final ServerHolder h1 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 100L, ServerType.HISTORICAL.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
final ServerHolder h2 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name2", "host1", 200L, ServerType.HISTORICAL.name(), "tier1", 0),
100L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
final ServerHolder h3 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host2", 200L, ServerType.HISTORICAL.name(), "tier1", 0),
100L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
final ServerHolder h4 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 200L, ServerType.HISTORICAL.name(), "tier2", 0),
100L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
final ServerHolder h5 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", 100L, ServerType.REALTIME.name(), "tier1", 0),
0L,
ImmutableMap.of(
"src1",
dataSources.get("src1")
),
ImmutableMap.of(
"segment1",
segments.get(0)
)
),
new LoadQueuePeonTester()
);
Assert.assertEquals(h1, h2);
Assert.assertNotEquals(h1, h3);
Assert.assertNotEquals(h1, h4);
Assert.assertNotEquals(h1, h5);
}
}

View File

@ -71,6 +71,7 @@ public class DruidCoordinatorCleanupOvershadowedTest
availableSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2);
druidCluster = new DruidCluster(
null,
ImmutableMap.of("normal", MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Collections.singletonList(new ServerHolder(druidServer, mockPeon))
)));

View File

@ -188,6 +188,7 @@ public class BroadcastDistributionRuleTest
);
druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(

View File

@ -158,6 +158,7 @@ public class LoadRuleTest
};
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -282,6 +283,7 @@ public class LoadRuleTest
);
server2.addDataSegment(segment.getIdentifier(), segment);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -374,6 +376,7 @@ public class LoadRuleTest
};
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
@ -482,6 +485,7 @@ public class LoadRuleTest
server2.addDataSegment(segment.getIdentifier(), segment);
DruidCluster druidCluster = new DruidCluster(
null,
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(

View File

@ -41,7 +41,7 @@ public class ServersResourceTest {
@Before
public void setUp()
{
DruidServer dummyServer = new DruidServer("dummy", "host", 1234L, "type", "tier", 0);
DruidServer dummyServer = new DruidServer("dummy", "host", 1234L, "historical", "tier", 0);
DataSegment segment = DataSegment.builder()
.dataSource("dataSource")
.interval(new Interval("2016-03-22T14Z/2016-03-22T15Z"))
@ -65,7 +65,7 @@ public class ServersResourceTest {
String result = objectMapper.writeValueAsString(res.getEntity());
String expected = "[{\"host\":\"host\","
+ "\"maxSize\":1234,"
+ "\"type\":\"type\","
+ "\"type\":\"HISTORICAL\","
+ "\"tier\":\"tier\","
+ "\"priority\":0,"
+ "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":"
@ -80,7 +80,7 @@ public class ServersResourceTest {
{
Response res = serversResource.getClusterServers(null, "simple");
String result = objectMapper.writeValueAsString(res.getEntity());
String expected = "[{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"type\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}]";
String expected = "[{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"HISTORICAL\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}]";
Assert.assertEquals(expected, result);
}
@ -91,7 +91,7 @@ public class ServersResourceTest {
String result = objectMapper.writeValueAsString(res.getEntity());
String expected = "{\"host\":\"host\","
+ "\"maxSize\":1234,"
+ "\"type\":\"type\","
+ "\"type\":\"HISTORICAL\","
+ "\"tier\":\"tier\","
+ "\"priority\":0,"
+ "\"segments\":{\"dataSource_2016-03-22T14:00:00.000Z_2016-03-22T15:00:00.000Z_v0\":"
@ -106,7 +106,7 @@ public class ServersResourceTest {
{
Response res = serversResource.getServer(server.getName(), "simple");
String result = objectMapper.writeValueAsString(res.getEntity());
String expected = "{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"type\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}";
String expected = "{\"host\":\"host\",\"tier\":\"tier\",\"type\":\"HISTORICAL\",\"priority\":0,\"currSize\":1,\"maxSize\":1234}";
Assert.assertEquals(expected, result);
}

View File

@ -27,7 +27,7 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidServerConfig;
import io.druid.server.coordination.ServerManager;
import io.druid.server.SegmentManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.timeline.DataSegment;
import org.easymock.Capture;
@ -47,7 +47,7 @@ import java.util.Map;
public class HistoricalMetricsMonitorTest extends EasyMockSupport
{
private DruidServerConfig druidServerConfig;
private ServerManager serverManager;
private SegmentManager segmentManager;
private ZkCoordinator zkCoordinator;
private ServiceEmitter serviceEmitter;
@ -55,7 +55,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport
public void setUp()
{
druidServerConfig = EasyMock.createStrictMock(DruidServerConfig.class);
serverManager = EasyMock.createStrictMock(ServerManager.class);
segmentManager = EasyMock.createStrictMock(SegmentManager.class);
zkCoordinator = EasyMock.createStrictMock(ZkCoordinator.class);
serviceEmitter = EasyMock.createStrictMock(ServiceEmitter.class);
}
@ -84,17 +84,17 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport
EasyMock.expect(zkCoordinator.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once();
EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once();
EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once();
EasyMock.expect(serverManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size));
EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size));
EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once();
EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once();
EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).times(2);
EasyMock.expect(serverManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L));
EasyMock.expect(segmentManager.getDataSourceCounts()).andReturn(ImmutableMap.of(dataSource, 1L));
EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once();
EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once();
final HistoricalMetricsMonitor monitor = new HistoricalMetricsMonitor(
druidServerConfig,
serverManager,
segmentManager,
zkCoordinator
);
@ -102,9 +102,9 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport
serviceEmitter.emit(EasyMock.capture(eventCapture));
EasyMock.expectLastCall().times(5);
EasyMock.replay(druidServerConfig, serverManager, zkCoordinator, serviceEmitter);
EasyMock.replay(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter);
monitor.doMonitor(serviceEmitter);
EasyMock.verify(druidServerConfig, serverManager, zkCoordinator, serviceEmitter);
EasyMock.verify(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter);
final String host = "host";
final String service = "service";

View File

@ -40,6 +40,7 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierC
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.QueryResource;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.server.http.SegmentListerResource;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.QueryCountStatsProvider;
@ -109,5 +110,8 @@ public class RealtimeModule implements Module
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, QueryResource.class);
LifecycleModule.register(binder, Server.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, ZkCoordinator.class);
}
}

View File

@ -51,7 +51,7 @@ public class TestServerInventoryView implements TimelineServerView
@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
{
final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", 0, "dummy", "dummy", 0);
final DruidServerMetadata dummyServer = new DruidServerMetadata("dummy", "dummy", 0, "historical", "dummy", 0);
for (final DataSegment segment : segments) {
exec.execute(