Load broadcast datasources on broker and tasks (#9971)

* Load broadcast datasources on broker and tasks

* Add javadocs

* Support HTTP segment management

* Fix indexer maxSize

* inspection fix

* Make segment cache optional on non-historicals

* Fix build

* Fix inspections, some coverage, failed tests

* More tests

* Add CliIndexer to MainTest

* Fix inspection

* Rename UnprunedDataSegment to LoadableDataSegment

* Address PR comments

* Fix
This commit is contained in:
Jonathan Wei 2020-06-08 20:15:59 -07:00 committed by GitHub
parent 7f51e44b00
commit 771870ae2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 700 additions and 348 deletions

View File

@ -191,4 +191,10 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
{
return TYPE;
}
@Override
public boolean supportsQueries()
{
return true;
}
}

View File

@ -345,6 +345,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
INPUT_FORMAT
)
);
Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);

View File

@ -137,6 +137,12 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
return TYPE;
}
@Override
public boolean supportsQueries()
{
return true;
}
@VisibleForTesting
AWSCredentialsConfig getAwsCredentialsConfig()
{

View File

@ -347,8 +347,8 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
null,
false
)
);
Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);

View File

@ -145,6 +145,12 @@ public abstract class AbstractTask implements Task
return null;
}
@Override
public boolean supportsQueries()
{
return false;
}
@Override
public String getClasspathPrefix()
{

View File

@ -245,6 +245,12 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
return (queryPlus, responseContext) -> queryPlus.run(appenderator, responseContext);
}
@Override
public boolean supportsQueries()
{
return true;
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{

View File

@ -196,6 +196,12 @@ public class RealtimeIndexTask extends AbstractTask
return plumber.getQueryRunner(query);
}
@Override
public boolean supportsQueries()
{
return true;
}
@Override
public boolean isReady(TaskActionClient taskActionClient)
{

View File

@ -146,6 +146,11 @@ public interface Task
*/
<T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
* @return true if this Task type is queryable, such as streaming ingestion tasks
*/
boolean supportsQueries();
/**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
* extra classpath should be prepended, this should return null or the empty string.

View File

@ -327,6 +327,13 @@ public class ForkingTaskRunner
command.add(nodeType);
}
// If the task type is queryable, we need to load broadcast segments on the peon, used for
// join queries
if (task.supportsQueries()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
if (!taskFile.exists()) {
jsonMapper.writeValue(taskFile, task);
}

View File

@ -337,6 +337,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
{
expectPublishedSegments(1);
final AppenderatorDriverRealtimeIndexTask task = makeRealtimeTask(null);
Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> statusFuture = runTask(task);
// Wait for firehose to show up, it starts off null.

View File

@ -229,6 +229,8 @@ public class IndexTaskTest extends IngestionTestBase
appenderatorsManager
);
Assert.assertFalse(indexTask.supportsQueries());
final List<DataSegment> segments = runTask(indexTask).rhs;
Assert.assertEquals(2, segments.size());

View File

@ -189,6 +189,12 @@ public class RealtimeIndexTaskTest
Assert.assertEquals(task.getId(), task.getTaskResource().getAvailabilityGroup());
}
@Test(timeout = 60_000L)
public void testSupportsQueries()
{
final RealtimeIndexTask task = makeRealtimeTask(null);
Assert.assertTrue(task.supportsQueries());
}
@Test(timeout = 60_000L, expected = ExecutionException.class)
public void testHandoffTimeout() throws Exception

View File

@ -291,10 +291,6 @@
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>

View File

@ -452,7 +452,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (SegmentServerSelector p : segments) {
if (!p.getServer().pick().getServer().segmentReplicatable()) {
if (!p.getServer().pick().getServer().isSegmentReplicationTarget()) {
hasOnlyHistoricalSegments = false;
break;
}
@ -633,7 +633,7 @@ public class CachingClusteredClient implements QuerySegmentWalker
if (isBySegment) {
serverResults = getBySegmentServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
} else if (!server.segmentReplicatable() || !populateCache) {
} else if (!server.isSegmentReplicationTarget() || !populateCache) {
serverResults = getSimpleServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);
} else {
serverResults = getAndCacheServerResults(serverRunner, segmentsOfServer, maxQueuedBytesPerServer);

View File

@ -137,9 +137,19 @@ public class DruidServer implements Comparable<DruidServer>
return metadata.getTier();
}
public boolean segmentReplicatable()
public boolean isSegmentReplicationTarget()
{
return metadata.segmentReplicatable();
return metadata.isSegmentReplicationTarget();
}
public boolean isSegmentBroadcastTarget()
{
return metadata.isSegmentBroadcastTarget();
}
public boolean isSegmentReplicationOrBroadcastTarget()
{
return metadata.isSegmentReplicationTarget() || metadata.isSegmentBroadcastTarget();
}
@JsonProperty

View File

@ -44,7 +44,8 @@ public abstract class DruidNodeDiscoveryProvider
private static final Map<String, Set<NodeRole>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY,
ImmutableSet.of(NodeRole.BROKER, NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
DataNodeService.DISCOVERY_SERVICE_KEY,
ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER, NodeRole.BROKER),
WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
);

View File

@ -23,9 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.druid.utils.JvmUtils;
import org.hibernate.validator.constraints.NotEmpty;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -34,8 +34,7 @@ import java.util.concurrent.TimeUnit;
public class SegmentLoaderConfig
{
@JsonProperty
@NotEmpty
private List<StorageLocationConfig> locations = null;
private List<StorageLocationConfig> locations = Collections.emptyList();
@JsonProperty("lazyLoadOnStart")
private boolean lazyLoadOnStart = false;

View File

@ -89,7 +89,6 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
this.indexIO = indexIO;
this.config = config;
this.jsonMapper = mapper;
this.locations = new ArrayList<>();
for (StorageLocationConfig locationConfig : config.getLocations()) {
locations.add(

View File

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.realtime;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import java.io.IOException;
/**
* Ties the {@link DataSegmentServerAnnouncer} announce/unannounce to the lifecycle start and stop.
*
* Analogous to {@link org.apache.druid.server.coordination.SegmentLoadDropHandler} on the Historicals,
* but without segment cache management.
*/
@ManageLifecycle
public class CliIndexerDataSegmentServerAnnouncerLifecycleHandler
{
private static final EmittingLogger LOG = new EmittingLogger(CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
private final DataSegmentServerAnnouncer dataSegmentServerAnnouncer;
private final LifecycleLock lifecycleLock = new LifecycleLock();
@Inject
public CliIndexerDataSegmentServerAnnouncerLifecycleHandler(
DataSegmentServerAnnouncer dataSegmentServerAnnouncer
)
{
this.dataSegmentServerAnnouncer = dataSegmentServerAnnouncer;
}
@LifecycleStart
public void start() throws IOException
{
if (!lifecycleLock.canStart()) {
throw new RuntimeException("Lifecycle lock could not start");
}
try {
if (lifecycleLock.isStarted()) {
return;
}
LOG.info("Starting...");
try {
dataSegmentServerAnnouncer.announce();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw new RuntimeException(e);
}
LOG.info("Started.");
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}
@LifecycleStop
public void stop()
{
if (!lifecycleLock.canStop()) {
throw new RuntimeException("Lifecycle lock could not stop");
}
if (!lifecycleLock.isStarted()) {
return;
}
LOG.info("Stopping...");
try {
dataSegmentServerAnnouncer.unannounce();
}
catch (Exception e) {
throw new RuntimeException(e);
}
LOG.info("Stopped.");
}
}

View File

@ -142,7 +142,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
== descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) {
&& segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::isSegmentReplicationOrBroadcastTarget)) {
return true;
}
}

View File

@ -107,11 +107,21 @@ public class DruidServerMetadata
return priority;
}
public boolean segmentReplicatable()
public boolean isSegmentReplicationTarget()
{
return type.isSegmentReplicationTarget();
}
public boolean isSegmentBroadcastTarget()
{
return type.isSegmentBroadcastTarget();
}
public boolean isSegmentReplicationOrBroadcastTarget()
{
return isSegmentReplicationTarget() || isSegmentBroadcastTarget();
}
@Override
public boolean equals(Object o)
{

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
/**
* A deserialization aid used by {@link SegmentChangeRequestLoad}. The broker prunes the loadSpec from segments
* for efficiency reasons, but the broker does need the loadSpec when it loads broadcast segments.
*
* This class always uses the non-pruning default {@link PruneSpecsHolder}.
*/
public class LoadableDataSegment extends DataSegment
{
@JsonCreator
public LoadableDataSegment(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
// use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
@JsonProperty("dimensions")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> dimensions,
@JsonProperty("metrics")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> metrics,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JacksonInject PruneSpecsHolder pruneSpecsHolder
)
{
super(
dataSource,
interval,
version,
loadSpec,
dimensions,
metrics,
shardSpec,
lastCompactionState,
binaryVersion,
size,
PruneSpecsHolder.DEFAULT
);
}
}

View File

@ -35,14 +35,26 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
{
private final DataSegment segment;
/**
* To avoid pruning of the loadSpec on the broker, needed when the broker is loading broadcast segments,
* we deserialize into an {@link LoadableDataSegment}, which never removes the loadSpec.
*/
@JsonCreator
public SegmentChangeRequestLoad(
@JsonUnwrapped DataSegment segment
@JsonUnwrapped LoadableDataSegment segment
)
{
this.segment = segment;
}
public SegmentChangeRequestLoad(
DataSegment segment
)
{
this.segment = segment;
}
@Override
public void go(DataSegmentChangeHandler handler, @Nullable DataSegmentChangeCallback callback)
{

View File

@ -34,6 +34,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -104,7 +106,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager
SegmentManager segmentManager,
ServerTypeConfig serverTypeConfig
)
{
this(
@ -116,7 +119,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
)
),
serverTypeConfig
);
}
@ -127,7 +131,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ScheduledExecutorService exec
ScheduledExecutorService exec,
ServerTypeConfig serverTypeConfig
)
{
this.jsonMapper = jsonMapper;
@ -139,6 +144,13 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
this.exec = exec;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
if (config.getLocations().isEmpty()) {
if (ServerType.HISTORICAL.equals(serverTypeConfig.getServerType())) {
throw new IAE("Segment cache locations must be set on historicals.");
} else {
log.info("Not starting SegmentLoadDropHandler with empty segment cache locations.");
}
}
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}
@ -152,8 +164,10 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
log.info("Starting...");
try {
loadLocalCache();
serverAnnouncer.announce();
if (!config.getLocations().isEmpty()) {
loadLocalCache();
serverAnnouncer.announce();
}
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
@ -174,7 +188,9 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
log.info("Stopping...");
try {
serverAnnouncer.unannounce();
if (!config.getLocations().isEmpty()) {
serverAnnouncer.unannounce();
}
}
catch (Exception e) {
throw new RuntimeException(e);

View File

@ -63,6 +63,14 @@ public enum ServerType
{
return false;
}
},
BROKER {
@Override
public boolean isSegmentReplicationTarget()
{
return false;
}
};
/**

View File

@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
/**
* This interface describes the coordinator balancing strategy, which is responsible for making decisions on where
@ -56,11 +57,17 @@ public interface BalancerStrategy
/**
* Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
* @param serverHolders set of historicals to consider for moving segments
* @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules.
* Balancing strategies should avoid rebalancing segments for such datasources, since
* they should be loaded on all servers anyway.
* NOTE: this should really be handled on a per-segment basis, to properly support
* the interval or period-based broadcast rules. For simplicity of the initial
* implementation, only forever broadcast rules are supported.
* @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
* there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Nullable
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources);
/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first

View File

@ -71,7 +71,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
@Override
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
if (server.segmentReplicatable()) {
if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.addSegment(server.getName(), segment);
}
return ServerView.CallbackAction.CONTINUE;
@ -80,7 +80,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
@Override
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
if (server.segmentReplicatable()) {
if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.removeSegment(server.getName(), segment);
}
return ServerView.CallbackAction.CONTINUE;
@ -98,7 +98,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto
serverInventoryView.registerServerRemovedCallback(
executor,
server -> {
if (server.segmentReplicatable()) {
if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.removeServer(server.getName());
}
return ServerView.CallbackAction.CONTINUE;

View File

@ -35,6 +35,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@ -211,9 +212,12 @@ public class CostBalancerStrategy implements BalancerStrategy
@Override
public BalancerSegmentHolder pickSegmentToMove(final List<ServerHolder> serverHolders)
public BalancerSegmentHolder pickSegmentToMove(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources
)
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders);
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
}
@Override

View File

@ -47,24 +47,28 @@ public class DruidCluster
@VisibleForTesting
static DruidCluster createDruidClusterFromBuilderInTest(
@Nullable Set<ServerHolder> realtimes,
Map<String, Iterable<ServerHolder>> historicals
Map<String, Iterable<ServerHolder>> historicals,
@Nullable Set<ServerHolder> brokers
)
{
return new DruidCluster(realtimes, historicals);
return new DruidCluster(realtimes, historicals, brokers);
}
private final Set<ServerHolder> realtimes;
private final Map<String, NavigableSet<ServerHolder>> historicals;
private final Set<ServerHolder> brokers;
public DruidCluster()
{
this.realtimes = new HashSet<>();
this.historicals = new HashMap<>();
this.brokers = new HashSet<>();
}
private DruidCluster(
@Nullable Set<ServerHolder> realtimes,
Map<String, Iterable<ServerHolder>> historicals
Map<String, Iterable<ServerHolder>> historicals,
@Nullable Set<ServerHolder> brokers
)
{
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
@ -72,6 +76,7 @@ public class DruidCluster
historicals,
holders -> CollectionUtils.newTreeSet(Comparator.reverseOrder(), holders)
);
this.brokers = brokers == null ? new HashSet<>() : new HashSet<>(brokers);
}
public void add(ServerHolder serverHolder)
@ -87,7 +92,11 @@ public class DruidCluster
addHistorical(serverHolder);
break;
case INDEXER_EXECUTOR:
throw new IAE("unsupported server type[%s]", serverHolder.getServer().getType());
addRealtime(serverHolder);
break;
case BROKER:
addBroker(serverHolder);
break;
default:
throw new IAE("unknown server type[%s]", serverHolder.getServer().getType());
}
@ -108,6 +117,11 @@ public class DruidCluster
tierServers.add(serverHolder);
}
private void addBroker(ServerHolder serverHolder)
{
brokers.add(serverHolder);
}
public Set<ServerHolder> getRealtimes()
{
return realtimes;
@ -118,6 +132,12 @@ public class DruidCluster
return historicals;
}
public Set<ServerHolder> getBrokers()
{
return brokers;
}
public Iterable<String> getTierNames()
{
return historicals.keySet();
@ -135,6 +155,7 @@ public class DruidCluster
final List<ServerHolder> allServers = new ArrayList<>(historicalSize + realtimeSize);
historicals.values().forEach(allServers::addAll);
allServers.addAll(brokers);
allServers.addAll(realtimes);
return allServers;
}
@ -146,7 +167,7 @@ public class DruidCluster
public boolean isEmpty()
{
return historicals.isEmpty() && realtimes.isEmpty();
return historicals.isEmpty() && realtimes.isEmpty() && brokers.isEmpty();
}
public boolean hasHistoricals()
@ -159,9 +180,19 @@ public class DruidCluster
return !realtimes.isEmpty();
}
public boolean hasBrokers()
{
return !brokers.isEmpty();
}
public boolean hasTier(String tier)
{
NavigableSet<ServerHolder> servers = historicals.get(tier);
return (servers != null) && !servers.isEmpty();
NavigableSet<ServerHolder> historicalServers = historicals.get(tier);
boolean historicalsHasTier = (historicalServers != null) && !historicalServers.isEmpty();
if (historicalsHasTier) {
return true;
}
return false;
}
}

View File

@ -761,7 +761,7 @@ public class DruidCoordinator
List<ImmutableDruidServer> currentServers = serverInventoryView
.getInventory()
.stream()
.filter(DruidServer::segmentReplicatable)
.filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());

View File

@ -34,7 +34,9 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@ -70,6 +72,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategy balancerStrategy;
private final Set<String> broadcastDatasources;
private DruidCoordinatorRuntimeParams(
long startTimeNanos,
@ -85,7 +88,8 @@ public class DruidCoordinatorRuntimeParams
CoordinatorCompactionConfig coordinatorCompactionConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategy balancerStrategy
BalancerStrategy balancerStrategy,
Set<String> broadcastDatasources
)
{
this.startTimeNanos = startTimeNanos;
@ -102,6 +106,7 @@ public class DruidCoordinatorRuntimeParams
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.balancerStrategy = balancerStrategy;
this.broadcastDatasources = broadcastDatasources;
}
public long getStartTimeNanos()
@ -180,6 +185,11 @@ public class DruidCoordinatorRuntimeParams
return balancerStrategy;
}
public Set<String> getBroadcastDatasources()
{
return broadcastDatasources;
}
public boolean coordinatorIsLeadingEnoughTimeToMarkAsUnusedOvershadowedSegements()
{
long nanosElapsedSinceCoordinatorStart = System.nanoTime() - getStartTimeNanos();
@ -256,6 +266,7 @@ public class DruidCoordinatorRuntimeParams
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategy balancerStrategy;
private Set<String> broadcastDatasources;
private Builder()
{
@ -272,6 +283,7 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
this.balancerReferenceTimestamp = DateTimes.nowUtc();
this.broadcastDatasources = new HashSet<>();
}
Builder(
@ -324,7 +336,8 @@ public class DruidCoordinatorRuntimeParams
coordinatorCompactionConfig,
stats,
balancerReferenceTimestamp,
balancerStrategy
balancerStrategy,
broadcastDatasources
);
}
@ -436,5 +449,11 @@ public class DruidCoordinatorRuntimeParams
this.balancerStrategy = balancerStrategy;
return this;
}
public Builder withBroadcastDatasources(Set<String> broadcastDatasources)
{
this.broadcastDatasources = broadcastDatasources;
return this;
}
}
}

View File

@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
public class RandomBalancerStrategy implements BalancerStrategy
@ -51,9 +52,9 @@ public class RandomBalancerStrategy implements BalancerStrategy
}
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders);
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
}
@Override

View File

@ -22,19 +22,33 @@ package org.apache.druid.server.coordinator;
import org.apache.druid.timeline.DataSegment;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
final class ReservoirSegmentSampler
{
static BalancerSegmentHolder getRandomBalancerSegmentHolder(final List<ServerHolder> serverHolders)
static BalancerSegmentHolder getRandomBalancerSegmentHolder(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources
)
{
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int numSoFar = 0;
for (ServerHolder server : serverHolders) {
if (!server.getServer().getType().isSegmentReplicationTarget()) {
// if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do
continue;
}
for (DataSegment segment : server.getServer().iterateAllSegments()) {
if (broadcastDatasources.contains(segment.getDataSource())) {
// we don't need to rebalance segments that were assigned via broadcast rules
continue;
}
int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
// w.p. 1 / (numSoFar+1), swap out the server and segment
if (randNum == numSoFar) {

View File

@ -122,6 +122,11 @@ public class ServerHolder implements Comparable<ServerHolder>
return peon.getSegmentsToLoad().contains(segment);
}
public boolean isDroppingSegment(DataSegment segment)
{
return peon.getSegmentsToDrop().contains(segment);
}
public int getNumberOfSegmentsInQueue()
{
return peon.getNumberOfSegmentsInQueue();

View File

@ -187,7 +187,10 @@ public class BalanceSegments implements CoordinatorDuty
//noinspection ForLoopThatDoesntUseLoopVariable
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(
toMoveFrom,
params.getBroadcastDatasources()
);
if (segmentToMoveHolder == null) {
log.info("All servers to move segments from are empty, ending run.");
break;

View File

@ -28,11 +28,13 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -101,6 +103,7 @@ public class RunRules implements CoordinatorDuty
final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0;
final Set<String> broadcastDatasources = new HashSet<>();
for (DataSegment segment : params.getUsedSegments()) {
if (overshadowed.contains(segment.getId())) {
// Skipping overshadowed segments
@ -112,6 +115,12 @@ public class RunRules implements CoordinatorDuty
if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment));
foundMatchingRule = true;
// The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules
// executes before BalanceSegments
if (rule instanceof BroadcastDistributionRule) {
broadcastDatasources.add(segment.getDataSource());
}
break;
}
}
@ -131,6 +140,9 @@ public class RunRules implements CoordinatorDuty
.emit();
}
return params.buildFromExisting().withCoordinatorStats(stats).build();
return params.buildFromExisting()
.withCoordinatorStats(stats)
.withBroadcastDatasources(broadcastDatasources)
.build();
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.rules;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@ -27,8 +28,8 @@ import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public abstract class BroadcastDistributionRule implements Rule
{
@ -37,30 +38,35 @@ public abstract class BroadcastDistributionRule implements Rule
@Override
public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment)
{
// Find servers which holds the segments of co-located data source
final Set<ServerHolder> loadServerHolders = new HashSet<>();
final Set<ServerHolder> dropServerHolders = new HashSet<>();
final List<String> colocatedDataSources = getColocatedDataSources();
if (colocatedDataSources == null || colocatedDataSources.isEmpty()) {
loadServerHolders.addAll(params.getDruidCluster().getAllServers());
} else {
params.getDruidCluster().getAllServers().forEach(
eachHolder -> {
if (!eachHolder.isDecommissioning()
&& colocatedDataSources.stream()
.anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) {
loadServerHolders.add(eachHolder);
} else if (eachHolder.isServingSegment(segment)) {
if (!eachHolder.getPeon().getSegmentsToDrop().contains(segment)) {
dropServerHolders.add(eachHolder);
}
}
}
);
}
// Find servers where we need to load the broadcast segments
final Set<ServerHolder> loadServerHolders =
params.getDruidCluster().getAllServers()
.stream()
.filter(
(serverHolder) -> {
ServerType serverType = serverHolder.getServer().getType();
if (!serverType.isSegmentBroadcastTarget()) {
return false;
}
final boolean isServingSegment =
serverHolder.isServingSegment(segment);
if (serverHolder.isDecommissioning()) {
if (isServingSegment && !serverHolder.isDroppingSegment(segment)) {
dropServerHolders.add(serverHolder);
}
return false;
}
return !isServingSegment && !serverHolder.isLoadingSegment(segment);
}
)
.collect(Collectors.toSet());
final CoordinatorStats stats = new CoordinatorStats();
return stats.accumulate(assign(loadServerHolders, segment))
.accumulate(drop(dropServerHolders, segment));
}
@ -110,6 +116,4 @@ public abstract class BroadcastDistributionRule implements Rule
return stats;
}
public abstract List<String> getColocatedDataSources();
}

View File

@ -25,21 +25,16 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Objects;
public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
{
static final String TYPE = "broadcastForever";
private final List<String> colocatedDataSources;
@JsonCreator
public ForeverBroadcastDistributionRule(
@JsonProperty("colocatedDataSources") List<String> colocatedDataSources
)
public ForeverBroadcastDistributionRule()
{
this.colocatedDataSources = colocatedDataSources;
}
@Override
@ -49,13 +44,6 @@ public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
return TYPE;
}
@Override
@JsonProperty
public List<String> getColocatedDataSources()
{
return colocatedDataSources;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
@ -79,13 +67,12 @@ public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
return false;
}
ForeverBroadcastDistributionRule that = (ForeverBroadcastDistributionRule) o;
return Objects.equals(colocatedDataSources, that.colocatedDataSources);
return true;
}
@Override
public int hashCode()
{
return Objects.hash(getType(), colocatedDataSources);
return Objects.hash(getType());
}
}

View File

@ -25,23 +25,19 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.List;
import java.util.Objects;
public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
{
static final String TYPE = "broadcastByInterval";
private final Interval interval;
private final List<String> colocatedDataSources;
@JsonCreator
public IntervalBroadcastDistributionRule(
@JsonProperty("interval") Interval interval,
@JsonProperty("colocatedDataSources") List<String> colocatedDataSources
@JsonProperty("interval") Interval interval
)
{
this.interval = interval;
this.colocatedDataSources = colocatedDataSources;
}
@Override
@ -51,13 +47,6 @@ public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
return TYPE;
}
@Override
@JsonProperty
public List<String> getColocatedDataSources()
{
return colocatedDataSources;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
@ -79,26 +68,19 @@ public class IntervalBroadcastDistributionRule extends BroadcastDistributionRule
@Override
public boolean equals(Object o)
{
if (o == this) {
if (this == o) {
return true;
}
if (o == null || o.getClass() != getClass()) {
if (o == null || getClass() != o.getClass()) {
return false;
}
IntervalBroadcastDistributionRule that = (IntervalBroadcastDistributionRule) o;
if (!Objects.equals(interval, that.interval)) {
return false;
}
return Objects.equals(colocatedDataSources, that.colocatedDataSources);
return Objects.equals(getInterval(), that.getInterval());
}
@Override
public int hashCode()
{
return Objects.hash(getType(), interval, colocatedDataSources);
return Objects.hash(getInterval());
}
}

View File

@ -26,7 +26,6 @@ import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.List;
import java.util.Objects;
public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
@ -36,18 +35,15 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
private final Period period;
private final boolean includeFuture;
private final List<String> colocatedDataSources;
@JsonCreator
public PeriodBroadcastDistributionRule(
@JsonProperty("period") Period period,
@JsonProperty("includeFuture") Boolean includeFuture,
@JsonProperty("colocatedDataSources") List<String> colocatedDataSources
@JsonProperty("includeFuture") Boolean includeFuture
)
{
this.period = period;
this.includeFuture = includeFuture == null ? DEFAULT_INCLUDE_FUTURE : includeFuture;
this.colocatedDataSources = colocatedDataSources;
}
@Override
@ -57,13 +53,6 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
return TYPE;
}
@Override
@JsonProperty
public List<String> getColocatedDataSources()
{
return colocatedDataSources;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
@ -94,25 +83,17 @@ public class PeriodBroadcastDistributionRule extends BroadcastDistributionRule
if (this == o) {
return true;
}
if (o == null || o.getClass() != getClass()) {
if (o == null || getClass() != o.getClass()) {
return false;
}
PeriodBroadcastDistributionRule that = (PeriodBroadcastDistributionRule) o;
if (!Objects.equals(period, that.period)) {
return false;
}
if (includeFuture != that.includeFuture) {
return false;
}
return Objects.equals(colocatedDataSources, that.colocatedDataSources);
return isIncludeFuture() == that.isIncludeFuture() &&
Objects.equals(getPeriod(), that.getPeriod());
}
@Override
public int hashCode()
{
return Objects.hash(getType(), period, colocatedDataSources);
return Objects.hash(getPeriod(), isIncludeFuture());
}
}

View File

@ -769,7 +769,7 @@ public class DataSourcesResource
&& segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber()
&& segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
&& Iterables.any(
segmentLoadInfo.getServers(), DruidServerMetadata::segmentReplicatable
segmentLoadInfo.getServers(), DruidServerMetadata::isSegmentReplicationTarget
)) {
return true;
}

View File

@ -174,7 +174,7 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
)
);
Assert.assertFalse(
Assert.assertTrue(
CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
Collections.singletonList(
new ImmutableSegmentLoadInfo(

View File

@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
@ -32,6 +34,7 @@ import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.loading.CacheTestSegmentLoader;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
@ -39,12 +42,16 @@ import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -67,6 +74,7 @@ public class SegmentLoadDropHandlerTest
private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
private SegmentLoadDropHandler segmentLoadDropHandler;
private DataSegmentAnnouncer announcer;
private File infoDir;
private AtomicInteger announceCount;
@ -74,22 +82,36 @@ public class SegmentLoadDropHandlerTest
private CacheTestSegmentLoader segmentLoader;
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
private SegmentLoaderConfig segmentLoaderConfigNoLocations;
private ScheduledExecutorFactory scheduledExecutorFactory;
private List<StorageLocationConfig> locations;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp()
{
try {
infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest");
infoDir.mkdirs();
for (File file : infoDir.listFiles()) {
file.delete();
}
infoDir = temporaryFolder.newFolder();
log.info("Creating tmp test files in [%s]", infoDir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
locations = Collections.singletonList(
new StorageLocationConfig(
infoDir,
100L,
100d
)
);
scheduledRunnable = new ArrayList<>();
segmentLoader = new CacheTestSegmentLoader();
@ -132,57 +154,91 @@ public class SegmentLoadDropHandlerTest
}
};
segmentLoadDropHandler = new SegmentLoadDropHandler(
jsonMapper,
new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
return infoDir;
}
@Override
public int getNumLoadingThreads()
{
return 5;
}
segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
return infoDir;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
},
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
new ScheduledExecutorFactory()
{
@Override
public ScheduledExecutorService create(int corePoolSize, String nameFormat)
{
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public List<StorageLocationConfig> getLocations()
{
return locations;
}
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
};
segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
{
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
};
scheduledExecutorFactory = new ScheduledExecutorFactory()
{
@Override
public ScheduledExecutorService create(int corePoolSize, String nameFormat)
{
/*
Override normal behavoir by adding the runnable to a list so that you can make sure
all the shceduled runnables are executed by explicitly calling run() on each item in the list
*/
return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat))
{
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
{
scheduledRunnable.add(command);
return null;
}
};
return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat))
{
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
{
scheduledRunnable.add(command);
return null;
}
}.create(5, "SegmentLoadDropHandlerTest-[%d]")
};
}
};
segmentLoadDropHandler = new SegmentLoadDropHandler(
jsonMapper,
segmentLoaderConfig,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL)
);
}
@ -220,6 +276,40 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.stop();
}
@Test
public void testSegmentLoading1BrokerWithNoLocations() throws Exception
{
SegmentLoadDropHandler segmentLoadDropHandlerBrokerWithNoLocations = new SegmentLoadDropHandler(
jsonMapper,
segmentLoaderConfigNoLocations,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-brokerNoLocations-[%d]"),
new ServerTypeConfig(ServerType.BROKER)
);
segmentLoadDropHandlerBrokerWithNoLocations.start();
segmentLoadDropHandler.stop();
}
@Test
public void testSegmentLoading1HistoricalWithNoLocations()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Segment cache locations must be set on historicals.");
new SegmentLoadDropHandler(
jsonMapper,
segmentLoaderConfigNoLocations,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL)
);
}
/**
* Steps:
* 1. addSegment() succesfully loads the segment and annouces it
@ -382,13 +472,20 @@ public class SegmentLoadDropHandlerTest
return 5;
}
@Override
public List<StorageLocationConfig> getLocations()
{
return locations;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
},
announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager
announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL)
);
Set<DataSegment> segments = new HashSet<>();

View File

@ -23,10 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.initialization.ZkPathsConfig;
@ -37,9 +40,15 @@ import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
@ -47,6 +56,8 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class ZkCoordinatorTest extends CuratorTestBase
{
private static final Logger log = new Logger(ZkCoordinatorTest.class);
private final ObjectMapper jsonMapper = ServerTestHelper.MAPPER;
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
@ -67,9 +78,31 @@ public class ZkCoordinatorTest extends CuratorTestBase
};
private ZkCoordinator zkCoordinator;
private File infoDir;
private List<StorageLocationConfig> locations;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp() throws Exception
{
try {
infoDir = temporaryFolder.newFolder();
log.info("Creating tmp test files in [%s]", infoDir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
locations = Collections.singletonList(
new StorageLocationConfig(
infoDir,
100L,
100d
)
);
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
@ -102,11 +135,42 @@ public class ZkCoordinatorTest extends CuratorTestBase
SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
ServerTestHelper.MAPPER,
new SegmentLoaderConfig(),
new SegmentLoaderConfig() {
@Override
public File getInfoDir()
{
return infoDir;
}
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public List<StorageLocationConfig> getLocations()
{
return locations;
}
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
},
EasyMock.createNiceMock(DataSegmentAnnouncer.class),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
EasyMock.createNiceMock(SegmentManager.class),
EasyMock.createNiceMock(ScheduledExecutorService.class)
EasyMock.createNiceMock(ScheduledExecutorService.class),
new ServerTypeConfig(ServerType.HISTORICAL)
)
{
@Override

View File

@ -43,6 +43,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -66,9 +67,11 @@ public class BalanceSegmentsTest
private DataSegment segment2;
private DataSegment segment3;
private DataSegment segment4;
private DataSegment segment5;
private List<DataSegment> segments;
private ListeningExecutorService balancerStrategyExecutor;
private BalancerStrategy balancerStrategy;
private Set<String> broadcastDatasources;
@Before
public void setUp()
@ -82,6 +85,7 @@ public class BalanceSegmentsTest
segment2 = EasyMock.createMock(DataSegment.class);
segment3 = EasyMock.createMock(DataSegment.class);
segment4 = EasyMock.createMock(DataSegment.class);
segment5 = EasyMock.createMock(DataSegment.class);
DateTime start1 = DateTimes.of("2012-01-01");
DateTime start2 = DateTimes.of("2012-02-01");
@ -130,12 +134,24 @@ public class BalanceSegmentsTest
0,
8L
);
segment5 = new DataSegment(
"datasourceBroadcast",
new Interval(start2, start2.plusHours(1)),
version.toString(),
new HashMap<>(),
new ArrayList<>(),
new ArrayList<>(),
NoneShardSpec.instance(),
0,
8L
);
segments = new ArrayList<>();
segments.add(segment1);
segments.add(segment2);
segments.add(segment3);
segments.add(segment4);
segments.add(segment5);
peon1 = new LoadQueuePeonTester();
peon2 = new LoadQueuePeonTester();
@ -147,6 +163,8 @@ public class BalanceSegmentsTest
balancerStrategyExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
broadcastDatasources = Collections.singleton("datasourceBroadcast");
}
@After
@ -187,10 +205,11 @@ public class BalanceSegmentsTest
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
Assert.assertEquals(3, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}
/**
@ -213,10 +232,10 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false))))
EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, false)), broadcastDatasources))
.andReturn(new BalancerSegmentHolder(druidServer2, segment3))
.andReturn(new BalancerSegmentHolder(druidServer2, segment4));
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.andReturn(new BalancerSegmentHolder(druidServer1, segment2));
@ -237,6 +256,7 @@ public class BalanceSegmentsTest
.build() // ceil(3 * 0.6) = 2 segments from decommissioning servers
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@ -280,7 +300,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.andReturn(new BalancerSegmentHolder(druidServer1, segment2))
.andReturn(new BalancerSegmentHolder(druidServer2, segment3))
@ -303,6 +323,7 @@ public class BalanceSegmentsTest
.build()
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@ -328,7 +349,7 @@ public class BalanceSegmentsTest
mockCoordinator(coordinator);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.anyTimes();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> {
@ -343,6 +364,7 @@ public class BalanceSegmentsTest
ImmutableList.of(false, true)
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@ -362,7 +384,7 @@ public class BalanceSegmentsTest
ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1))
.once();
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
@ -377,6 +399,7 @@ public class BalanceSegmentsTest
)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(1).build())
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
params = new BalanceSegmentsTester(coordinator).run(params);
@ -412,6 +435,7 @@ public class BalanceSegmentsTest
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withBroadcastDatasources(broadcastDatasources)
.withDynamicConfigs(
CoordinatorDynamicConfig
.builder()
@ -451,6 +475,7 @@ public class BalanceSegmentsTest
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withBroadcastDatasources(broadcastDatasources)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
2
@ -542,6 +567,7 @@ public class BalanceSegmentsTest
)
.withUsedSegmentsInTest(segments)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withBroadcastDatasources(broadcastDatasources)
.withBalancerStrategy(balancerStrategy);
}
@ -611,7 +637,7 @@ public class BalanceSegmentsTest
}
@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders)
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
{
return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size());
}
@ -635,9 +661,9 @@ public class BalanceSegmentsTest
// either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3])
BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true))))
EasyMock.expect(strategy.pickSegmentToMove(ImmutableList.of(new ServerHolder(druidServer2, peon2, true)), broadcastDatasources))
.andReturn(new BalancerSegmentHolder(druidServer2, segment2));
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject()))
EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new BalancerSegmentHolder(druidServer1, segment1));
EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(new ServerHolder(druidServer3, peon3))
@ -656,6 +682,7 @@ public class BalanceSegmentsTest
.build()
)
.withBalancerStrategy(strategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
}
}

View File

@ -35,6 +35,7 @@ public final class DruidClusterBuilder
private @Nullable Set<ServerHolder> realtimes = null;
private final Map<String, Iterable<ServerHolder>> historicals = new HashMap<>();
private @Nullable Set<ServerHolder> brokers = null;
private DruidClusterBuilder()
{
@ -46,6 +47,12 @@ public final class DruidClusterBuilder
return this;
}
public DruidClusterBuilder withBrokers(ServerHolder... brokers)
{
this.brokers = new HashSet<>(Arrays.asList(brokers));
return this;
}
public DruidClusterBuilder addTier(String tierName, ServerHolder... historicals)
{
if (this.historicals.putIfAbsent(tierName, Arrays.asList(historicals)) != null) {
@ -56,6 +63,6 @@ public final class DruidClusterBuilder
public DruidCluster build()
{
return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals);
return DruidCluster.createDruidClusterFromBuilderInTest(realtimes, historicals, brokers);
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.ImmutableDruidServerTests;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@ -136,6 +137,7 @@ public class ReservoirSegmentSamplerTest
@Test
public void getRandomBalancerSegmentHolderTest()
{
EasyMock.expect(druidServer1.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
@ -143,6 +145,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer1);
EasyMock.expect(druidServer2.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer2.getName()).andReturn("2").atLeastOnce();
EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
@ -151,6 +154,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer2);
EasyMock.expect(druidServer3.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer3.getName()).andReturn("3").atLeastOnce();
EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
@ -159,6 +163,7 @@ public class ReservoirSegmentSamplerTest
EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
EasyMock.replay(druidServer3);
EasyMock.expect(druidServer4.getType()).andReturn(ServerType.HISTORICAL).atLeastOnce();
EasyMock.expect(druidServer4.getName()).andReturn("4").atLeastOnce();
EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
@ -186,7 +191,7 @@ public class ReservoirSegmentSamplerTest
Map<DataSegment, Integer> segmentCountMap = new HashMap<>();
for (int i = 0; i < 5000; i++) {
segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList).getSegment(), 1);
segmentCountMap.put(ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet()).getSegment(), 1);
}
for (DataSegment segment : segments) {

View File

@ -21,7 +21,6 @@ package org.apache.druid.server.coordinator.rules;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
@ -44,15 +43,15 @@ public class BroadcastDistributionRuleSerdeTest
public static List<Object[]> constructorFeeder()
{
return Lists.newArrayList(
new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of("large_source1", "large_source2"))},
new Object[]{new ForeverBroadcastDistributionRule(ImmutableList.of())},
new Object[]{new ForeverBroadcastDistributionRule(null)},
new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of("large_source"))},
new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), ImmutableList.of())},
new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"), null)},
new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of("large_source"))},
new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, ImmutableList.of())},
new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null, null)}
new Object[]{new ForeverBroadcastDistributionRule()},
new Object[]{new ForeverBroadcastDistributionRule()},
new Object[]{new ForeverBroadcastDistributionRule()},
new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
new Object[]{new IntervalBroadcastDistributionRule(Intervals.of("0/1000"))},
new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)},
new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)},
new Object[]{new PeriodBroadcastDistributionRule(new Period(1000), null)}
);
}

View File

@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.rules;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -269,7 +268,7 @@ public class BroadcastDistributionRuleTest
public void testBroadcastToSingleDataSource()
{
final ForeverBroadcastDistributionRule rule =
new ForeverBroadcastDistributionRule(ImmutableList.of("large_source"));
new ForeverBroadcastDistributionRule();
CoordinatorStats stats = rule.run(
null,
@ -285,7 +284,7 @@ public class BroadcastDistributionRuleTest
smallSegment
);
Assert.assertEquals(3L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
Assert.assertFalse(stats.hasPerTierStats());
Assert.assertTrue(
@ -295,10 +294,10 @@ public class BroadcastDistributionRuleTest
Assert.assertTrue(
holdersOfLargeSegments2.stream()
.noneMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
.allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
);
Assert.assertFalse(holderOfSmallSegment.getPeon().getSegmentsToLoad().contains(smallSegment));
Assert.assertTrue(holderOfSmallSegment.isServingSegment(smallSegment));
}
private static DruidCoordinatorRuntimeParams makeCoordinartorRuntimeParams(
@ -331,7 +330,7 @@ public class BroadcastDistributionRuleTest
public void testBroadcastDecommissioning()
{
final ForeverBroadcastDistributionRule rule =
new ForeverBroadcastDistributionRule(ImmutableList.of("large_source"));
new ForeverBroadcastDistributionRule();
CoordinatorStats stats = rule.run(
null,
@ -356,7 +355,6 @@ public class BroadcastDistributionRuleTest
public void testBroadcastToMultipleDataSources()
{
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(
ImmutableList.of("large_source", "large_source2")
);
CoordinatorStats stats = rule.run(
@ -392,7 +390,7 @@ public class BroadcastDistributionRuleTest
@Test
public void testBroadcastToAllServers()
{
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule(null);
final ForeverBroadcastDistributionRule rule = new ForeverBroadcastDistributionRule();
CoordinatorStats stats = rule.run(
null,
@ -408,14 +406,14 @@ public class BroadcastDistributionRuleTest
smallSegment
);
Assert.assertEquals(6L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
Assert.assertFalse(stats.hasPerTierStats());
Assert.assertTrue(
druidCluster
.getAllServers()
.stream()
.allMatch(holder -> holder.getPeon().getSegmentsToLoad().contains(smallSegment))
.allMatch(holder -> holder.isLoadingSegment(smallSegment) || holder.isServingSegment(smallSegment))
);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
import org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.CacheModule;
@ -42,9 +43,11 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.RetryQueryRunnerConfig;
@ -52,7 +55,12 @@ import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.server.BrokerQueryResource;
import org.apache.druid.server.ClientInfoResource;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.BrokerResource;
import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
@ -123,12 +131,19 @@ public class CliBroker extends ServerRunnable
Jerseys.addResource(binder, HttpServerInventoryViewResource.class);
LifecycleModule.register(binder, Server.class);
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.BROKER));
Jerseys.addResource(binder, HistoricalResource.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, ZkCoordinator.class);
bindNodeRoleAndAnnouncer(
binder,
DiscoverySideEffectsProvider
.builder(NodeRole.BROKER)
.serviceClasses(ImmutableList.of(LookupNodeService.class))
.serviceClasses(ImmutableList.of(DataNodeService.class, LookupNodeService.class))
.useLegacyAnnouncer(true)
.build()
);

View File

@ -25,9 +25,9 @@ import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.DruidServerConfig;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
@ -43,6 +43,7 @@ import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
@ -60,12 +61,13 @@ import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
@ -138,14 +140,14 @@ public class CliIndexer extends ServerRunnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class);
LifecycleModule.register(binder, CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
Jerseys.addResource(binder, ShuffleResource.class);
LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
binder.bind(SegmentLoadDropHandler.class).toProvider(Providers.of(null));
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
Jerseys.addResource(binder, HistoricalResource.class);
LifecycleModule.register(binder, ZkCoordinator.class);
bindNodeRoleAndAnnouncer(
binder,
@ -186,11 +188,11 @@ public class CliIndexer extends ServerRunnable
@Provides
@LazySingleton
public DataNodeService getDataNodeService()
public DataNodeService getDataNodeService(DruidServerConfig serverConfig)
{
return new DataNodeService(
DruidServer.DEFAULT_TIER,
0L,
serverConfig.getMaxSize(),
ServerType.INDEXER_EXECUTOR,
DruidServer.DEFAULT_PRIORITY
);

View File

@ -61,7 +61,6 @@ import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
@ -109,15 +108,16 @@ import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffN
import org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
import org.eclipse.jetty.server.Server;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.List;
@ -154,6 +154,14 @@ public class CliPeon extends GuiceRunnable
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
public String serverType = "indexer-executor";
/**
* If set to "true", the peon will bind classes necessary for loading broadcast segments. This is used for
* queryable tasks, such as streaming ingestion tasks.
*/
@Option(name = "--loadBroadcastSegments", title = "loadBroadcastSegments", description = "Enable loading of broadcast segments")
public String loadBroadcastSegments = "false";
private static final Logger log = new Logger(CliPeon.class);
@Inject
@ -174,6 +182,7 @@ public class CliPeon extends GuiceRunnable
new JoinableFactoryModule(),
new Module()
{
@SuppressForbidden(reason = "System#out, System#err")
@Override
public void configure(Binder binder)
{
@ -218,6 +227,13 @@ public class CliPeon extends GuiceRunnable
Jerseys.addResource(binder, SegmentListerResource.class);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType)));
LifecycleModule.register(binder, Server.class);
if ("true".equals(loadBroadcastSegments)) {
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
Jerseys.addResource(binder, HistoricalResource.class);
LifecycleModule.register(binder, ZkCoordinator.class);
}
}
@Provides
@ -247,16 +263,6 @@ public class CliPeon extends GuiceRunnable
{
return task.getId();
}
@Provides
public SegmentListerResource getSegmentListerResource(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
@Nullable BatchDataSegmentAnnouncer announcer
)
{
return new SegmentListerResource(jsonMapper, smileMapper, announcer, null);
}
},
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),

View File

@ -50,7 +50,9 @@ public class MainTest
//new Object[]{new CliInternalHadoopIndexer()},
new Object[]{new CliMiddleManager()},
new Object[]{new CliRouter()}
new Object[]{new CliRouter()},
new Object[]{new CliIndexer()}
);
}

View File

@ -355,7 +355,7 @@ public class DruidSchema extends AbstractSchema
AvailableSegmentMetadata segmentMetadata = knownSegments != null ? knownSegments.get(segment.getId()) : null;
if (segmentMetadata == null) {
// segmentReplicatable is used to determine if segments are served by historical or realtime servers
long isRealtime = server.segmentReplicatable() ? 0 : 1;
long isRealtime = server.isSegmentReplicationTarget() ? 0 : 1;
segmentMetadata = AvailableSegmentMetadata.builder(
segment,
isRealtime,
@ -366,7 +366,7 @@ public class DruidSchema extends AbstractSchema
// Unknown segment.
setAvailableSegmentMetadata(segment.getId(), segmentMetadata);
segmentsNeedingRefresh.add(segment.getId());
if (!server.segmentReplicatable()) {
if (!server.isSegmentReplicationTarget()) {
log.debug("Added new mutable segment[%s].", segment.getId());
mutableSegments.add(segment.getId());
} else {
@ -384,7 +384,7 @@ public class DruidSchema extends AbstractSchema
.withRealtime(recomputeIsRealtime(servers))
.build();
knownSegments.put(segment.getId(), metadataWithNumReplicas);
if (server.segmentReplicatable()) {
if (server.isSegmentReplicationTarget()) {
// If a segment shows up on a replicatable (historical) server at any point, then it must be immutable,
// even if it's also available on non-replicatable (realtime) servers.
mutableSegments.remove(segment.getId());

View File

@ -11887,7 +11887,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testNestedGroupByOnInlineDataSourceWithFilterIsNotSupported(Map<String, Object> queryContext) throws Exception
public void testNestedGroupByOnInlineDataSourceWithFilter(Map<String, Object> queryContext) throws Exception
{
try {
testQuery(