Merge pull request #351 from metamx/az

Bridge Multiple Druid Clusters
This commit is contained in:
fjy 2014-01-21 17:33:15 -08:00
commit 6b39591030
61 changed files with 2247 additions and 446 deletions

View File

@ -97,7 +97,6 @@ This describes the data schema for the output Druid segment. More information ab
|aggregators|Array of Objects|The list of aggregators to use to aggregate colliding rows together.|yes|
|dataSource|String|The name of the dataSource that the segment belongs to.|yes|
|indexGranularity|String|The granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows.|yes|
|segmentGranularity|String|The granularity of the segment as a whole. This is generally larger than the index granularity and describes the rate at which the realtime server will push segments out for historical servers to take over.|yes|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a sharded fashion.|no|
### Config

View File

@ -219,9 +219,9 @@ Congratulations! The segment has completed building. Once a segment is built, a
You should see the following logs on the coordinator:
```bash
2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Load Queues:
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served.
2013-10-09 21:41:54,368 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - [_default_tier] : Assigned 1 segments among 1 servers
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Load Queues:
2013-10-09 21:41:54,369 INFO [Coordinator-Exec--0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[localhost:8081, historical, _default_tier] has 1 left to load, 0 left to drop, 4,477 bytes queued, 4,477 bytes served.
```
These logs indicate that the coordinator has assigned our new segment to the historical node to download and serve. If you look at the historical node logs, you should see:

View File

@ -313,7 +313,7 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);
List<Interval> intervals = segmentSpec.getIntervals();
if ("realtime".equals(server.getType()) || !populateCache || isBySegment) {
if (server.isRealtime() || !populateCache || isBySegment) {
resultSeqToAdd = clientQueryable.run(query.withQuerySegmentSpec(segmentSpec));
} else {
resultSeqToAdd = toolChest.mergeSequences(

View File

@ -36,7 +36,10 @@ import java.util.concurrent.ConcurrentMap;
*/
public class DruidServer implements Comparable
{
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_NUM_REPLICANTS = 2;
public static final String DEFAULT_TIER = "_default_tier";
private static final Logger log = new Logger(DruidServer.class);
private final Object lock = new Object();
@ -59,7 +62,8 @@ public class DruidServer implements Comparable
node.getHost(),
config.getMaxSize(),
type,
config.getTier()
config.getTier(),
DEFAULT_PRIORITY
);
}
@ -69,10 +73,11 @@ public class DruidServer implements Comparable
@JsonProperty("host") String host,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type,
@JsonProperty("tier") String tier
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
)
{
this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier);
this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier, priority);
this.dataSources = new ConcurrentHashMap<String, DruidDataSource>();
this.segments = new ConcurrentHashMap<String, DataSegment>();
@ -118,6 +123,12 @@ public class DruidServer implements Comparable
return metadata.getTier();
}
@JsonProperty
public int getPriority()
{
return metadata.getPriority();
}
@JsonProperty
public Map<String, DataSegment> getSegments()
{
@ -125,6 +136,11 @@ public class DruidServer implements Comparable
return Collections.unmodifiableMap(segments);
}
public boolean isRealtime()
{
return getType().equalsIgnoreCase("realtime");
}
public DataSegment getSegment(String segmentName)
{
return segments.get(segmentName);

View File

@ -32,7 +32,10 @@ public class DruidServerConfig
private long maxSize = 0;
@JsonProperty
private String tier = "_default_tier";
private String tier = DruidServer.DEFAULT_TIER;
@JsonProperty
private int priority = DruidServer.DEFAULT_PRIORITY;
public long getMaxSize()
{
@ -43,4 +46,9 @@ public class DruidServerConfig
{
return tier;
}
public int getPriority()
{
return priority;
}
}

View File

@ -20,10 +20,14 @@
package io.druid.client.selector;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import io.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
{
@ -37,8 +41,25 @@ public class ConnectionCountServerSelectorStrategy implements ServerSelectorStra
};
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
public QueryableDruidServer pick(
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
)
{
return Collections.min(servers, comparator);
final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry();
if (highestPriorityServers == null) {
return null;
}
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
final int size = servers.size();
switch (size) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return highestPriorityServers.getValue().iterator().next();
default:
return Collections.min(servers, comparator);
}
}
}

View File

@ -20,17 +20,36 @@
package io.druid.client.selector;
import com.google.common.collect.Iterators;
import com.metamx.common.ISE;
import io.druid.timeline.DataSegment;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Random random = new Random();
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers)
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment)
{
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
final Map.Entry<Integer, Set<QueryableDruidServer>> highestPriorityServers = prioritizedServers.pollLastEntry();
if (highestPriorityServers == null) {
return null;
}
final Set<QueryableDruidServer> servers = highestPriorityServers.getValue();
final int size = servers.size();
switch (size) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return highestPriorityServers.getValue().iterator().next();
default:
return Iterators.get(servers.iterator(), random.nextInt(size));
}
}
}

View File

@ -19,18 +19,21 @@
package io.druid.client.selector;
import com.google.api.client.util.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.emitter.EmittingLogger;
import io.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.Comparator;
import java.util.Set;
import java.util.TreeMap;
/**
*/
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{
private static final EmittingLogger log = new EmittingLogger(ServerSelector.class);
private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final DataSegment segment;
@ -76,12 +79,17 @@ public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
public QueryableDruidServer pick()
{
synchronized (this) {
final int size = servers.size();
switch (size) {
case 0: return null;
case 1: return servers.iterator().next();
default: return strategy.pick(servers);
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = Maps.newTreeMap();
for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) {
theServers = Sets.newHashSet();
prioritizedServers.put(server.getServer().getPriority(), theServers);
}
theServers.add(server);
}
return strategy.pick(prioritizedServers, segment);
}
}
}

View File

@ -21,8 +21,10 @@ package io.druid.client.selector;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment;
import java.util.Set;
import java.util.TreeMap;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
@JsonSubTypes(value = {
@ -31,5 +33,5 @@ import java.util.Set;
})
public interface ServerSelectorStrategy
{
public QueryableDruidServer pick(Set<QueryableDruidServer> servers);
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
}

View File

@ -63,6 +63,7 @@ public class Announcer
private final PathChildrenCacheFactory factory;
private final List<Pair<String, byte[]>> toAnnounce = Lists.newArrayList();
private final List<Pair<String, byte[]>> toUpdate = Lists.newArrayList();
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
@ -92,6 +93,11 @@ public class Announcer
announce(pair.lhs, pair.rhs);
}
toAnnounce.clear();
for (Pair<String, byte[]> pair : toUpdate) {
update(pair.lhs, pair.rhs);
}
toUpdate.clear();
}
}
@ -268,6 +274,13 @@ public class Announcer
public void update(final String path, final byte[] bytes)
{
synchronized (toAnnounce) {
if (!started) {
toUpdate.add(Pair.of(path, bytes));
return;
}
}
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
@ -31,14 +32,14 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.concurrent.Execs;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
@ -86,10 +87,11 @@ public class DatabaseRuleManager
}
final List<Rule> defaultRules = Arrays.<Rule>asList(
new PeriodLoadRule(
new Period("P5000Y"),
2,
"_default_tier"
new ForeverLoadRule(
ImmutableMap.<String, Integer>of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
)
);
final String version = new DateTime().toString();

View File

@ -462,7 +462,8 @@ public class DatabaseSegmentManager
}
}
private String getSegmentsTable() {
private String getSegmentsTable()
{
return dbTables.get().getSegmentsTable();
}
}

View File

@ -67,7 +67,8 @@ public class StorageNodeModule implements Module
node.getHost(),
config.getMaxSize(),
nodeType.getNodeType(),
config.getTier()
config.getTier(),
config.getPriority()
);
}
}

View File

@ -704,7 +704,7 @@ public class RealtimePlumber implements Plumber
return ServerView.CallbackAction.UNREGISTER;
}
if ("realtime".equals(server.getType())) {
if (server.isRealtime()) {
return ServerView.CallbackAction.CONTINUE;
}

View File

@ -0,0 +1,36 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Bridge
{
}

View File

@ -0,0 +1,31 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import io.druid.curator.CuratorConfig;
import org.skife.config.Config;
/**
*/
public abstract class BridgeCuratorConfig extends CuratorConfig
{
@Config("druid.bridge.zk.service.host")
public abstract String getParentZkHosts();
}

View File

@ -0,0 +1,126 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import java.net.URL;
import java.util.List;
/**
*/
public class BridgeQuerySegmentWalker implements QuerySegmentWalker
{
private static final Logger log = new Logger(BridgeQuerySegmentWalker.class);
private final ServerDiscoverySelector brokerSelector;
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
private final StatusResponseHandler responseHandler;
@Inject
public BridgeQuerySegmentWalker(
ServerDiscoverySelector brokerSelector,
@Global HttpClient httpClient,
ObjectMapper jsonMapper
)
{
this.brokerSelector = brokerSelector;
this.httpClient = httpClient;
this.jsonMapper = jsonMapper;
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(
Query<T> query, Iterable<Interval> intervals
)
{
return makeRunner();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(
Query<T> query, Iterable<SegmentDescriptor> specs
)
{
return makeRunner();
}
private <T> QueryRunner<T> makeRunner()
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query)
{
try {
Server instance = brokerSelector.pick();
if (instance == null) {
return Sequences.empty();
}
final String url = String.format(
"http://%s/druid/v2/",
brokerSelector.pick().getHost()
);
StatusResponseHolder response = httpClient.post(new URL(url))
.setContent(
"application/json",
jsonMapper.writeValueAsBytes(query)
)
.go(responseHandler)
.get();
List<T> results = jsonMapper.readValue(
response.getContent(), new TypeReference<List<T>>()
{
}
);
return Sequences.simple(results);
}
catch (Exception e) {
log.error(e, "Exception with bridge query");
return Sequences.empty();
}
}
};
}
}

View File

@ -0,0 +1,132 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.db.DatabaseSegmentManager;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.server.coordination.BaseZkCoordinator;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
/**
*/
public class BridgeZkCoordinator extends BaseZkCoordinator
{
private static final Logger log = new Logger(BaseZkCoordinator.class);
private final DbSegmentPublisher dbSegmentPublisher;
private final DatabaseSegmentManager databaseSegmentManager;
private final ServerView serverView;
private final ExecutorService exec = Execs.singleThreaded("BridgeZkCoordinatorServerView-%s");
@Inject
public BridgeZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
@Bridge CuratorFramework curator,
DbSegmentPublisher dbSegmentPublisher,
DatabaseSegmentManager databaseSegmentManager,
ServerView serverView
)
{
super(jsonMapper, zkPaths, me, curator);
this.dbSegmentPublisher = dbSegmentPublisher;
this.databaseSegmentManager = databaseSegmentManager;
this.serverView = serverView;
}
@Override
public void loadLocalCache()
{
// do nothing
}
@Override
public DataSegmentChangeHandler getDataSegmentChangeHandler()
{
return BridgeZkCoordinator.this;
}
@Override
public void addSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{
try {
log.info("Publishing segment %s", segment.getIdentifier());
dbSegmentPublisher.publishSegment(segment);
serverView.registerSegmentCallback(
exec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(
DruidServer server, DataSegment theSegment
)
{
if (theSegment.equals(segment)) {
callback.execute();
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{
databaseSegmentManager.removeSegment(segment.getDataSource(), segment.getIdentifier());
serverView.registerSegmentCallback(
exec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentRemoved(
DruidServer server, DataSegment theSegment
)
{
if (theSegment.equals(segment)) {
callback.execute();
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
}

View File

@ -0,0 +1,391 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.curator.announcement.Announcer;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ManageLifecycle
public class DruidClusterBridge
{
public static final String BRIDGE_OWNER_NODE = "_BRIDGE";
public static final String NODE_TYPE = "bridge";
private static final EmittingLogger log = new EmittingLogger(DruidClusterBridge.class);
private final ObjectMapper jsonMapper;
private final DruidClusterBridgeConfig config;
private final ScheduledExecutorService exec;
private final DruidNode self;
// Communicates to the ZK cluster that this bridge node is deployed at
private final CuratorFramework curator;
private final AtomicReference<LeaderLatch> leaderLatch;
// Communicates to the remote (parent) ZK cluster
private final BridgeZkCoordinator bridgeZkCoordinator;
private final Announcer announcer;
private final ServerInventoryView<Object> serverInventoryView;
private final Map<DataSegment, Integer> segments = Maps.newHashMap();
private final Object lock = new Object();
private volatile boolean started = false;
private volatile boolean leader = false;
@Inject
public DruidClusterBridge(
ObjectMapper jsonMapper,
DruidClusterBridgeConfig config,
ScheduledExecutorFactory scheduledExecutorFactory,
@Self DruidNode self,
CuratorFramework curator,
AtomicReference<LeaderLatch> leaderLatch,
BridgeZkCoordinator bridgeZkCoordinator,
@Bridge Announcer announcer,
@Bridge final AbstractDataSegmentAnnouncer dataSegmentAnnouncer,
ServerInventoryView serverInventoryView
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.bridgeZkCoordinator = bridgeZkCoordinator;
this.announcer = announcer;
this.serverInventoryView = serverInventoryView;
this.curator = curator;
this.leaderLatch = leaderLatch;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.self = self;
ExecutorService serverInventoryViewExec = Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("DruidClusterBridge-ServerInventoryView-%d")
.build()
);
serverInventoryView.registerSegmentCallback(
serverInventoryViewExec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(
DruidServer server, DataSegment segment
)
{
try {
synchronized (lock) {
Integer count = segments.get(segment);
if (count == null) {
segments.put(segment, 1);
dataSegmentAnnouncer.announceSegment(segment);
} else {
segments.put(segment, count + 1);
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(DruidServer server, DataSegment segment)
{
try {
synchronized (lock) {
serverRemovedSegment(dataSegmentAnnouncer, segment, server);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
serverInventoryView.registerServerCallback(
serverInventoryViewExec,
new ServerView.ServerCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
try {
for (DataSegment dataSegment : server.getSegments().values()) {
serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
public boolean isLeader()
{
return leader;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
started = true;
createNewLeaderLatch();
try {
leaderLatch.get().start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private LeaderLatch createNewLeaderLatch()
{
final LeaderLatch newLeaderLatch = new LeaderLatch(
curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHost()
);
newLeaderLatch.addListener(
new LeaderLatchListener()
{
@Override
public void isLeader()
{
becomeLeader();
}
@Override
public void notLeader()
{
stopBeingLeader();
}
},
Execs.singleThreaded("CoordinatorLeader-%s")
);
return leaderLatch.getAndSet(newLeaderLatch);
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
stopBeingLeader();
try {
leaderLatch.get().close();
}
catch (IOException e) {
log.warn(e, "Unable to close leaderLatch, ignoring");
}
exec.shutdownNow();
started = false;
}
}
private void becomeLeader()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Go-Go Gadgetmobile! Starting bridge in %s", config.getStartDelay());
try {
bridgeZkCoordinator.start();
serverInventoryView.start();
ScheduledExecutors.scheduleWithFixedDelay(
exec,
config.getStartDelay(),
config.getPeriod(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
if (leader) {
Iterable<DruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(
new Predicate<DruidServer>()
{
@Override
public boolean apply(
DruidServer input
)
{
return !input.isRealtime();
}
}
);
long totalMaxSize = 0;
for (DruidServer server : servers) {
totalMaxSize += server.getMaxSize();
}
if (totalMaxSize == 0) {
log.warn("No servers founds!");
} else {
DruidServerMetadata me = new DruidServerMetadata(
self.getHost(),
self.getHost(),
totalMaxSize,
NODE_TYPE,
config.getTier(),
config.getPriority()
);
try {
final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHost());
log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize);
announcer.update(path, jsonMapper.writeValueAsBytes(me));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
if (leader) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
}
}
}
);
leader = true;
}
catch (Exception e) {
log.makeAlert(e, "Exception becoming leader")
.emit();
final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch);
try {
leaderLatch.get().start();
}
catch (Exception e1) {
// If an exception gets thrown out here, then the bridge will zombie out 'cause it won't be looking for
// the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but
// Curator likes to have "throws Exception" on methods so it might happen...
log.makeAlert(e1, "I am a zombie")
.emit();
}
}
}
}
private void stopBeingLeader()
{
synchronized (lock) {
try {
log.info("I'll get you next time, Gadget. Next time!");
bridgeZkCoordinator.stop();
serverInventoryView.stop();
leader = false;
}
catch (Exception e) {
log.makeAlert(e, "Unable to stopBeingLeader").emit();
}
}
}
private void serverRemovedSegment(DataSegmentAnnouncer dataSegmentAnnouncer, DataSegment segment, DruidServer server)
throws IOException
{
Integer count = segments.get(segment);
if (count != null) {
if (count == 1) {
dataSegmentAnnouncer.unannounceSegment(segment);
segments.remove(segment);
} else {
segments.put(segment, count - 1);
}
} else {
log.makeAlert("Trying to remove a segment that was never added?")
.addData("server", server.getHost())
.addData("segmentId", segment.getIdentifier())
.emit();
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import io.druid.client.DruidServer;
import io.druid.server.initialization.ZkPathsConfig;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
public abstract class DruidClusterBridgeConfig extends ZkPathsConfig
{
@Config("druid.server.tier")
@Default(DruidServer.DEFAULT_TIER)
public abstract String getTier();
@Config("druid.bridge.startDelay")
@Default("PT300s")
public abstract Duration getStartDelay();
@Config("druid.bridge.period")
@Default("PT60s")
public abstract Duration getPeriod();
@Config("druid.bridge.broker.serviceName")
public abstract String getBrokerServiceName();
@Config("druid.server.priority")
public int getPriority()
{
return DruidServer.DEFAULT_PRIORITY;
}
}

View File

@ -0,0 +1,196 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
/**
*/
public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
{
private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me;
private final CuratorFramework curator;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started;
public BaseZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
CuratorFramework curator
)
{
this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.me = me;
this.curator = curator;
}
@LifecycleStart
public void start() throws IOException
{
synchronized (lock) {
if (started) {
return;
}
log.info("Starting zkCoordinator for server[%s]", me);
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
true,
true,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
);
try {
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadLocalCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest segment = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
log.info("New node[%s] with segmentClass[%s]", path, segment.getClass());
try {
segment.go(
getDataSegmentChangeHandler(),
new DataSegmentChangeCallback()
{
boolean hasRun = false;
@Override
public void execute()
{
try {
if (!hasRun) {
curator.delete().guaranteed().forPath(path);
log.info("Completed processing for node[%s]", path);
hasRun = true;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.info(e1, "Failed to delete node[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", segment)
.emit();
}
break;
case CHILD_REMOVED:
log.info("%s was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
}
}
);
loadQueueCache.start();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
log.info("Stopping ZkCoordinator for [%s]", me);
synchronized (lock) {
if (!started) {
return;
}
try {
loadQueueCache.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
loadQueueCache = null;
started = false;
}
}
}
public abstract void loadLocalCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
}

View File

@ -0,0 +1,8 @@
package io.druid.server.coordination;
/**
*/
public interface DataSegmentChangeCallback
{
public void execute();
}

View File

@ -25,6 +25,6 @@ import io.druid.timeline.DataSegment;
*/
public interface DataSegmentChangeHandler
{
public void addSegment(DataSegment segment);
public void removeSegment(DataSegment segment);
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback);
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback);
}

View File

@ -32,5 +32,5 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
})
public interface DataSegmentChangeRequest
{
public void go(DataSegmentChangeHandler handler);
public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback);
}

View File

@ -31,6 +31,7 @@ public class DruidServerMetadata
private final long maxSize;
private final String tier;
private final String type;
private final int priority;
@JsonCreator
public DruidServerMetadata(
@ -38,7 +39,8 @@ public class DruidServerMetadata
@JsonProperty("host") String host,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type,
@JsonProperty("tier") String tier
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
)
{
this.name = name;
@ -46,6 +48,7 @@ public class DruidServerMetadata
this.maxSize = maxSize;
this.tier = tier;
this.type = type;
this.priority = priority;
}
@JsonProperty
@ -78,6 +81,12 @@ public class DruidServerMetadata
return type;
}
@JsonProperty
public int getPriority()
{
return priority;
}
@Override
public String toString()
{
@ -87,6 +96,7 @@ public class DruidServerMetadata
", maxSize=" + maxSize +
", tier='" + tier + '\'' +
", type='" + type + '\'' +
", priority='" + priority + '\'' +
'}';
}
}

View File

@ -46,9 +46,9 @@ public class SegmentChangeRequestDrop implements DataSegmentChangeRequest
}
@Override
public void go(DataSegmentChangeHandler handler)
public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback)
{
handler.removeSegment(segment);
handler.removeSegment(segment, callback);
}
@Override

View File

@ -39,9 +39,9 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
}
@Override
public void go(DataSegmentChangeHandler handler)
public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback)
{
handler.addSegment(segment);
handler.addSegment(segment, callback);
}
@JsonProperty

View File

@ -24,8 +24,8 @@ package io.druid.server.coordination;
public class SegmentChangeRequestNoop implements DataSegmentChangeRequest
{
@Override
public void go(DataSegmentChangeHandler handler)
public void go(DataSegmentChangeHandler handler, DataSegmentChangeCallback callback)
{
// do nothing
}
}

View File

@ -20,23 +20,14 @@
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.File;
import java.io.IOException;
@ -44,23 +35,15 @@ import java.util.List;
/**
*/
public class ZkCoordinator implements DataSegmentChangeHandler
public class ZkCoordinator extends BaseZkCoordinator
{
private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper;
private final SegmentLoaderConfig config;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me;
private final DataSegmentAnnouncer announcer;
private final CuratorFramework curator;
private final ServerManager serverManager;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started;
@Inject
public ZkCoordinator(
ObjectMapper jsonMapper,
@ -72,129 +55,20 @@ public class ZkCoordinator implements DataSegmentChangeHandler
ServerManager serverManager
)
{
super(jsonMapper, zkPaths, me, curator);
this.jsonMapper = jsonMapper;
this.config = config;
this.zkPaths = zkPaths;
this.me = me;
this.announcer = announcer;
this.curator = curator;
this.serverManager = serverManager;
}
@LifecycleStart
public void start() throws IOException
{
log.info("Starting zkCoordinator for server[%s]", me);
synchronized (lock) {
if (started) {
return;
}
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
true,
true,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
);
try {
config.getInfoDir().mkdirs();
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest segment = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
log.info("New node[%s] with segmentClass[%s]", path, segment.getClass());
try {
segment.go(ZkCoordinator.this);
curator.delete().guaranteed().forPath(path);
log.info("Completed processing for node[%s]", path);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.info(e1, "Failed to delete node[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", segment)
.emit();
}
break;
case CHILD_REMOVED:
log.info("%s was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
}
}
);
loadQueueCache.start();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
log.info("Stopping ZkCoordinator with config[%s]", config);
synchronized (lock) {
if (!started) {
return;
}
try {
loadQueueCache.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
loadQueueCache = null;
started = false;
}
}
}
private void loadCache()
@Override
public void loadLocalCache()
{
final long start = System.currentTimeMillis();
File baseDir = config.getInfoDir();
if (!baseDir.exists()) {
if (!baseDir.exists() && !config.getInfoDir().mkdirs()) {
return;
}
@ -221,11 +95,27 @@ public class ZkCoordinator implements DataSegmentChangeHandler
}
}
addSegments(cachedSegments);
addSegments(
cachedSegments,
new DataSegmentChangeCallback()
{
@Override
public void execute()
{
log.info("Cache load took %,d ms", System.currentTimeMillis() - start);
}
}
);
}
@Override
public void addSegment(DataSegment segment)
public DataSegmentChangeHandler getDataSegmentChangeHandler()
{
return ZkCoordinator.this;
}
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
try {
log.info("Loading segment %s", segment.getIdentifier());
@ -235,7 +125,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
loaded = serverManager.loadSegment(segment);
}
catch (Exception e) {
removeSegment(segment);
removeSegment(segment, callback);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
}
@ -246,7 +136,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment);
removeSegment(segment, callback);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
@ -260,16 +150,18 @@ public class ZkCoordinator implements DataSegmentChangeHandler
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
}
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment)
.emit();
}
finally {
callback.execute();
}
}
public void addSegments(Iterable<DataSegment> segments)
public void addSegments(Iterable<DataSegment> segments, DataSegmentChangeCallback callback)
{
try {
final List<String> segmentFailures = Lists.newArrayList();
@ -284,7 +176,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
}
catch (Exception e) {
log.error(e, "Exception loading segment[%s]", segment.getIdentifier());
removeSegment(segment);
removeSegment(segment, callback);
segmentFailures.add(segment.getIdentifier());
continue;
}
@ -297,7 +189,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
}
catch (IOException e) {
log.error(e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile);
removeSegment(segment);
removeSegment(segment, callback);
segmentFailures.add(segment.getIdentifier());
continue;
}
@ -326,11 +218,14 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.addData("segments", segments)
.emit();
}
finally {
callback.execute();
}
}
@Override
public void removeSegment(DataSegment segment)
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
try {
serverManager.dropSegment(segment);
@ -347,26 +242,8 @@ public class ZkCoordinator implements DataSegmentChangeHandler
.addData("segment", segment)
.emit();
}
}
public void removeSegments(Iterable<DataSegment> segments)
{
try {
for (DataSegment segment : segments) {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
announcer.unannounceSegments(segments);
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segments")
.addData("segments", segments)
.emit();
finally {
callback.execute();
}
}
}

View File

@ -54,6 +54,13 @@ import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.segment.IndexIO;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.server.coordinator.helper.DruidCoordinatorCleanup;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.server.coordinator.helper.DruidCoordinatorLogger;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ZkPathsConfig;
@ -200,7 +207,9 @@ public class DruidCoordinator
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
for (Rule rule : rules) {
if (rule instanceof LoadRule && rule.appliesTo(segment, now)) {
expectedSegmentsInCluster.add(segment.getDataSource(), ((LoadRule) rule).getReplicants());
for (Integer numReplicants : ((LoadRule) rule).getTieredReplicants().values()) {
expectedSegmentsInCluster.add(segment.getDataSource(), numReplicants);
}
break;
}
}
@ -364,7 +373,7 @@ public class DruidCoordinator
new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
try {
if (curator.checkExists().forPath(toServedSegPath) != null &&
@ -568,7 +577,7 @@ public class DruidCoordinator
if (leader) {
theRunnable.run();
}
if (leader) { // (We might no longer be coordinator)
if (leader) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
@ -768,7 +777,7 @@ public class DruidCoordinator
DruidServer input
)
{
return input.getType().equalsIgnoreCase("historical");
return !input.isRealtime();
}
}
);

View File

@ -21,7 +21,7 @@ package io.druid.server.coordinator;
/**
*/
public abstract class LoadPeonCallback
public interface LoadPeonCallback
{
protected abstract void execute();
public void execute();
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -25,6 +25,14 @@ import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
@ -163,7 +171,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
callback = new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
Map<String, BalancerSegmentHolder> movingSegments = currentlyMovingSegments.get(toServer.getTier());
if (movingSegments != null) {

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
@ -25,6 +25,14 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
@ -69,7 +77,7 @@ public class DruidCoordinatorCleanup implements DruidCoordinatorHelper
segment, new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
}
}

View File

@ -17,7 +17,9 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
/**
*/

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
@ -27,6 +27,11 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.collections.CountingMap;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import java.util.Map;

View File

@ -17,10 +17,15 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.metamx.emitter.EmittingLogger;
import io.druid.db.DatabaseRuleManager;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;

View File

@ -17,9 +17,11 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.metamx.common.logger.Logger;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.timeline.DataSegment;
import java.util.Set;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator;
package io.druid.server.coordinator.helper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@ -33,6 +33,9 @@ import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DatasourceWhitelist;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;

View File

@ -0,0 +1,66 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.helper;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.segment.IndexIO;
import io.druid.server.coordinator.DatasourceWhitelist;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.timeline.DataSegment;
import java.util.concurrent.atomic.AtomicReference;
public class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper
{
private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorVersionConverter.class);
private final IndexingServiceClient indexingServiceClient;
private final AtomicReference<DatasourceWhitelist> whitelistRef;
public DruidCoordinatorVersionConverter(
IndexingServiceClient indexingServiceClient,
AtomicReference<DatasourceWhitelist> whitelistRef
)
{
this.indexingServiceClient = indexingServiceClient;
this.whitelistRef = whitelistRef;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
DatasourceWhitelist whitelist = whitelistRef.get();
for (DataSegment dataSegment : params.getAvailableSegments()) {
if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) {
final Integer binaryVersion = dataSegment.getBinaryVersion();
if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) {
log.info("Upgrading version on segment[%s]", dataSegment.getIdentifier());
indexingServiceClient.upgradeSegment(dataSegment);
}
}
}
return params;
}
}

View File

@ -19,78 +19,24 @@
package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Range;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
/**
*/
public class SizeLoadRule extends LoadRule
public class ForeverDropRule extends DropRule
{
private final long low;
private final long high;
private final Integer replicants;
private final String tier;
private final Range<Long> range;
@JsonCreator
public SizeLoadRule(
@JsonProperty("low") long low,
@JsonProperty("high") long high,
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.low = low;
this.high = high;
this.replicants = replicants;
this.tier = tier;
this.range = Range.closedOpen(low, high);
}
@Override
@JsonProperty
public int getReplicants()
{
return replicants;
}
@Override
public int getReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
@JsonProperty
public String getTier()
{
return tier;
}
@Override
public String getType()
{
return "loadBySize";
}
@JsonProperty
public long getLow()
{
return low;
}
@JsonProperty
public long getHigh()
{
return high;
return "dropForever";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return range.contains(segment.getSize());
return true;
}
}

View File

@ -21,51 +21,49 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Range;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.Map;
/**
*/
public class SizeDropRule extends DropRule
public class ForeverLoadRule extends LoadRule
{
private final long low;
private final long high;
private final Range<Long> range;
private final Map<String, Integer> tieredReplicants;
@JsonCreator
public SizeDropRule(
@JsonProperty("low") long low,
@JsonProperty("high") long high
public ForeverLoadRule(
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants
)
{
this.low = low;
this.high = high;
this.range = Range.closedOpen(low, high);
this.tieredReplicants = tieredReplicants;
}
@Override
@JsonProperty
public String getType()
{
return "dropBySize";
return "loadForever";
}
@Override
@JsonProperty
public long getLow()
public Map<String, Integer> getTieredReplicants()
{
return low;
return tieredReplicants;
}
@JsonProperty
public long getHigh()
@Override
public int getNumReplicants(String tier)
{
return high;
Integer retVal = tieredReplicants.get(tier);
return (retVal == null) ? 0 : retVal;
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return range.contains(segment.getSize());
return true;
}
}

View File

@ -21,11 +21,14 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Map;
/**
*/
public class IntervalLoadRule extends LoadRule
@ -33,19 +36,25 @@ public class IntervalLoadRule extends LoadRule
private static final Logger log = new Logger(IntervalLoadRule.class);
private final Interval interval;
private final Integer replicants;
private final String tier;
private final Map<String, Integer> tieredReplicants;
@JsonCreator
public IntervalLoadRule(
@JsonProperty("interval") Interval interval,
@JsonProperty("load") Map<String, Integer> tieredReplicants,
// Replicants and tier are deprecated
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tier") String tier
)
{
this.interval = interval;
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
if (tieredReplicants != null) {
this.tieredReplicants = tieredReplicants;
} else { // Backwards compatible
this.tieredReplicants = ImmutableMap.of(tier, replicants);
}
}
@Override
@ -55,24 +64,17 @@ public class IntervalLoadRule extends LoadRule
return "loadByInterval";
}
@Override
@JsonProperty
public int getReplicants()
public Map<String, Integer> getTieredReplicants()
{
return replicants;
return tieredReplicants;
}
@Override
public int getReplicants(String tier)
public int getNumReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@Override
@JsonProperty
public String getTier()
{
return tier;
final Integer retVal = tieredReplicants.get(tier);
return retVal == null ? 0 : retVal;
}
@JsonProperty

View File

@ -19,20 +19,20 @@
package io.druid.server.coordinator.rules;
import com.google.api.client.util.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -48,39 +48,50 @@ public abstract class LoadRule implements Rule
{
CoordinatorStats stats = new CoordinatorStats();
int expectedReplicants = getReplicants();
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), getTier());
int clusterReplicants = params.getSegmentReplicantLookup().getClusterReplicants(segment.getIdentifier(), getTier());
final Map<String, Integer> loadStatus = Maps.newHashMap();
for (Map.Entry<String, Integer> entry : getTieredReplicants().entrySet()) {
final String tier = entry.getKey();
final int expectedReplicants = entry.getValue();
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(getTier());
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", getTier()).emit();
return stats;
int totalReplicants = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getServersByTier(tier);
if (serverQueue == null) {
log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
return stats;
}
final List<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
tier,
expectedReplicants,
totalReplicants,
strategy,
serverHolderList,
segment
)
);
}
int clusterReplicants = params.getSegmentReplicantLookup()
.getClusterReplicants(segment.getIdentifier(), tier);
loadStatus.put(tier, expectedReplicants - clusterReplicants);
}
// Remove over-replication
stats.accumulate(drop(loadStatus, segment, params));
final List<ServerHolder> serverHolderList = new ArrayList<ServerHolder>(serverQueue);
final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp();
final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp);
if (params.getAvailableSegments().contains(segment)) {
stats.accumulate(
assign(
params.getReplicationManager(),
expectedReplicants,
totalReplicants,
strategy,
serverHolderList,
segment
)
);
}
stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params));
return stats;
}
private CoordinatorStats assign(
final ReplicationThrottler replicationManager,
final String tier,
final int expectedReplicants,
int totalReplicants,
final BalancerStrategy strategy,
@ -89,11 +100,12 @@ public abstract class LoadRule implements Rule
)
{
final CoordinatorStats stats = new CoordinatorStats();
stats.addToTieredStat("assignedCount", tier, 0);
while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
if (replicate && !replicationManager.canCreateReplicant(getTier())) {
if (replicate && !replicationManager.canCreateReplicant(tier)) {
break;
}
@ -101,8 +113,8 @@ public abstract class LoadRule implements Rule
if (holder == null) {
log.warn(
"Not enough %s servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
getTier(),
"Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]",
tier,
segment.getIdentifier(),
expectedReplicants
);
@ -111,7 +123,7 @@ public abstract class LoadRule implements Rule
if (replicate) {
replicationManager.registerReplicantCreation(
getTier(), segment.getIdentifier(), holder.getServer().getHost()
tier, segment.getIdentifier(), holder.getServer().getHost()
);
}
@ -120,10 +132,10 @@ public abstract class LoadRule implements Rule
new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
replicationManager.unregisterReplicantCreation(
getTier(),
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
@ -131,7 +143,7 @@ public abstract class LoadRule implements Rule
}
);
stats.addToTieredStat("assignedCount", getTier(), 1);
stats.addToTieredStat("assignedCount", tier, 1);
++totalReplicants;
}
@ -139,30 +151,35 @@ public abstract class LoadRule implements Rule
}
private CoordinatorStats drop(
int expectedReplicants,
int clusterReplicants,
final Map<String, Integer> loadStatus,
final DataSegment segment,
final DruidCoordinatorRuntimeParams params
)
{
CoordinatorStats stats = new CoordinatorStats();
final ReplicationThrottler replicationManager = params.getReplicationManager();
if (!params.hasDeletionWaitTimeElapsed()) {
return stats;
}
// Make sure we have enough actual replicants in the cluster before doing anything
if (clusterReplicants < expectedReplicants) {
return stats;
// Make sure we have enough actual replicants in the correct tiers in the cluster before doing anything
for (Integer leftToLoad : loadStatus.values()) {
if (leftToLoad > 0) {
return stats;
}
}
Map<String, Integer> replicantsByType = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
final ReplicationThrottler replicationManager = params.getReplicationManager();
for (Map.Entry<String, Integer> entry : replicantsByType.entrySet()) {
String tier = entry.getKey();
int actualNumReplicantsForType = entry.getValue();
int expectedNumReplicantsForType = getReplicants(tier);
// Find all instances of this segment across tiers
Map<String, Integer> replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
for (Map.Entry<String, Integer> entry : replicantsByTier.entrySet()) {
final String tier = entry.getKey();
int actualNumReplicantsForTier = entry.getValue();
int expectedNumReplicantsForTier = getNumReplicants(tier);
stats.addToTieredStat("droppedCount", tier, 0);
MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().get(tier);
if (serverQueue == null) {
@ -171,7 +188,7 @@ public abstract class LoadRule implements Rule
}
List<ServerHolder> droppedServers = Lists.newArrayList();
while (actualNumReplicantsForType > expectedNumReplicantsForType) {
while (actualNumReplicantsForTier > expectedNumReplicantsForTier) {
final ServerHolder holder = serverQueue.pollLast();
if (holder == null) {
log.warn("Wtf, holder was null? I have no servers serving [%s]?", segment.getIdentifier());
@ -179,14 +196,14 @@ public abstract class LoadRule implements Rule
}
if (holder.isServingSegment(segment)) {
if (expectedNumReplicantsForType > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(getTier())) {
if (expectedNumReplicantsForTier > 0) { // don't throttle unless we are removing extra replicants
if (!replicationManager.canDestroyReplicant(tier)) {
serverQueue.add(holder);
break;
}
replicationManager.registerReplicantTermination(
getTier(),
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
@ -197,17 +214,17 @@ public abstract class LoadRule implements Rule
new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
replicationManager.unregisterReplicantTermination(
getTier(),
tier,
segment.getIdentifier(),
holder.getServer().getHost()
);
}
}
);
--actualNumReplicantsForType;
--actualNumReplicantsForTier;
stats.addToTieredStat("droppedCount", tier, 1);
}
droppedServers.add(holder);
@ -218,9 +235,7 @@ public abstract class LoadRule implements Rule
return stats;
}
public abstract int getReplicants();
public abstract Map<String, Integer> getTieredReplicants();
public abstract int getReplicants(String tier);
public abstract String getTier();
public abstract int getNumReplicants(String tier);
}

View File

@ -21,12 +21,15 @@ package io.druid.server.coordinator.rules;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.Map;
/**
*/
public class PeriodLoadRule extends LoadRule
@ -34,19 +37,24 @@ public class PeriodLoadRule extends LoadRule
private static final Logger log = new Logger(PeriodLoadRule.class);
private final Period period;
private final Integer replicants;
private final String tier;
private final Map<String, Integer> tieredReplicants;
@JsonCreator
public PeriodLoadRule(
@JsonProperty("period") Period period,
@JsonProperty("replicants") Integer replicants,
@JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
// The following two vars need to be deprecated
@JsonProperty("replicants") int replicants,
@JsonProperty("tier") String tier
)
{
this.period = period;
this.replicants = (replicants == null) ? 2 : replicants;
this.tier = tier;
if (tieredReplicants != null) {
this.tieredReplicants = tieredReplicants;
} else { // Backwards compatible
this.tieredReplicants = ImmutableMap.of(tier, replicants);
}
}
@Override
@ -62,22 +70,18 @@ public class PeriodLoadRule extends LoadRule
return period;
}
@Override
@JsonProperty
public int getReplicants()
public Map<String, Integer> getTieredReplicants()
{
return replicants;
return tieredReplicants;
}
@Override
public int getReplicants(String tier)
public int getNumReplicants(String tier)
{
return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0;
}
@JsonProperty
public String getTier()
{
return tier;
final Integer retVal = tieredReplicants.get(tier);
return retVal == null ? 0 : retVal;
}
@Override

View File

@ -33,12 +33,10 @@ import org.joda.time.DateTime;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class),
@JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class),
@JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class),
@JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class),
@JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class),
@JsonSubTypes.Type(name = "dropBySize", value = SizeDropRule.class)
@JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class)
})
public interface Rule
{
public String getType();

View File

@ -66,6 +66,12 @@ public abstract class ZkPathsConfig
return defaultPath("coordinator");
}
@Config("druid.zk.paths.connectorPath")
public String getConnectorPath()
{
return defaultPath("connector");
}
@Config("druid.zk.paths.indexer.announcementsPath")
public String getIndexerAnnouncementPath()
{

View File

@ -36,8 +36,9 @@
margin: 0 10px 0 10px;
}
.delete_rule {
.delete_rule, .add_tier {
border-style : none;
color:#555;
background-color:#eee;
cursor: pointer;
}

View File

@ -6,6 +6,8 @@ var ruleTypes = [
"loadByPeriod",
"dropByInterval",
"dropByPeriod",
"loadForever",
"dropForever",
"JSON"
];
@ -17,6 +19,7 @@ function makeRuleDiv(rule) {
} else {
retVal += makeRuleComponents(rule.type) + makeRuleBody(rule);
}
retVal += "</div>";
return retVal;
}
@ -54,12 +57,18 @@ function makeRuleBody(rule) {
case "loadByPeriod":
retVal += makeLoadByPeriod(rule);
break;
case "loadForever":
retVal += makeLoadForever(rule);
break;
case "dropByInterval":
retVal += makeDropByInterval(rule);
break;
case "dropByPeriod":
retVal += makeDropByPeriod(rule);
break;
case "dropForever":
retVal += "";
break;
case "JSON":
retVal += makeJSON();
break;
@ -72,36 +81,67 @@ function makeRuleBody(rule) {
}
function makeLoadByInterval(rule) {
return "<span class='rule_label'>interval</span><input type='text' class='long_text' name='interval' " + "value='" + rule.interval + "'/>" +
"<span class='rule_label'>replicants</span><input type='text' class='short_text' name='replicants' " + "value='" + rule.replicants + "'/>" +
makeTiersDropdown(rule)
;
var retVal = "";
retVal += "<span class='rule_label'>interval</span><input type='text' class='long_text' name='interval' " + "value='" + rule.interval + "'/>";
retVal += "<button type='button' class='add_tier'>Add Another Tier</button>";
if (rule.tieredReplicants === undefined) {
retVal += makeTierLoad(null, 0);
}
for (var tier in rule.tieredReplicants) {
retVal += makeTierLoad(tier, rule.tieredReplicants[tier]);
}
return retVal;
}
function makeLoadByPeriod(rule) {
return "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>" +
"<span class='rule_label'>replicants</span><input type='text' class='short_text' name='replicants' " + "value='" + rule.replicants + "'/>" +
makeTiersDropdown(rule)
;
var retVal = "";
retVal += "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>";
retVal += "<button type='button' class='add_tier'>Add Another Tier</button>";
if (rule.tieredReplicants === undefined) {
retVal += makeTierLoad(null, 0);
}
for (var tier in rule.tieredReplicants) {
retVal += makeTierLoad(tier, rule.tieredReplicants[tier]);
}
return retVal;
}
function makeLoadForever(rule) {
var retVal = "";
retVal += "<button type='button' class='add_tier'>Add Another Tier</button>";
if (rule.tieredReplicants === undefined) {
retVal += makeTierLoad(null, 0);
}
for (var tier in rule.tieredReplicants) {
retVal += makeTierLoad(tier, rule.tieredReplicants[tier]);
}
return retVal;
}
function makeTierLoad(tier, val) {
return "<div class='rule_tier'>" +
"<span class='rule_label'>replicants</span><input type='text' class='short_text' name='replicants' " + "value='" + val + "'/>" +
makeTiersDropdown(tier) +
"</div>";
}
function makeDropByInterval(rule) {
return "<span class='rule_label'>interval</span><input type='text' name='interval' " + "value='" + rule.interval + "'/>";
return "<span class='rule_label'>interval</span><input type='text' name='interval' " + "value='" + rule.interval + "'/>";
}
function makeDropByPeriod(rule) {
return "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>";
return "<span class='rule_label'>period</span><input type='text' name='period' " + "value='" + rule.period + "'/>";
}
function makeJSON() {
return "<span class='rule_label'>JSON</span><input type='text' class='very_long_text' name='JSON'/>";
}
function makeTiersDropdown(rule) {
function makeTiersDropdown(selTier) {
var retVal = "<span class='rule_label'>tier</span><select class='tiers' name='tier'>"
$.each(tiers, function(index, tier) {
if (rule.tier === tier) {
if (selTier === tier) {
retVal += "<option selected='selected' value='" + tier + "'>" + tier + "</option>";
} else {
retVal += "<option value='" + tier + "'>" + tier + "</option>";
@ -128,7 +168,7 @@ function getRules() {
function domToRule(domRule) {
var ruleType = $($(domRule).find(".rule_dropdown_types:first")).val();
var inputs = $($(domRule).find(".rule_body:first")).children(":not(span)");
var inputs = $($(domRule).find(".rule_body:first")).children("input");
// Special case for free form JSON
if (ruleType === "JSON") {
@ -141,6 +181,17 @@ function domToRule(domRule) {
var name = $(input).attr("name");
rule[name] = $(input).val();
});
var theTiers = $($(domRule).find(".rule_body:first")).children(".rule_tier");
var tieredReplicants = {};
$.each(theTiers, function(index, theTier) {
var tierName = $(theTier).find("select").val();
var replicants = $(theTier).find("[name=replicants]").val();
tieredReplicants[tierName] = replicants;
});
rule.tieredReplicants = tieredReplicants;
return rule;
}
@ -237,6 +288,10 @@ $(document).ready(function() {
$(event.target).parent(".rule").remove();
});
$(".add_tier").live("click", function(event) {
$(event.target).parent().append(makeTierLoad(null, 0));
});
$("#create_new_rule").click(function (event) {
$('#rules_list').prepend(makeRuleDiv());
});

View File

@ -104,12 +104,12 @@ public class DirectDruidClientTest
);
QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
null,
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServer(queryableDruidServer1);
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
null,
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
client2
);
serverSelector.addServer(queryableDruidServer2);

View File

@ -93,7 +93,8 @@ public class BatchServerInventoryViewTest
"host",
Long.MAX_VALUE,
"type",
"tier"
"tier",
0
),
new BatchDataSegmentAnnouncerConfig()
{

View File

@ -0,0 +1,234 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.client.BatchServerInventoryView;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.db.DatabaseSegmentManager;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.server.DruidNode;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class DruidClusterBridgeTest
{
@Test
public void testRun() throws Exception
{
TestingCluster localCluster = new TestingCluster(1);
localCluster.start();
CuratorFramework localCf = CuratorFrameworkFactory.builder()
.connectString(localCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(
new PotentiallyGzippedCompressionProvider(
false
)
)
.build();
localCf.start();
TestingCluster remoteCluster = new TestingCluster(1);
remoteCluster.start();
CuratorFramework remoteCf = CuratorFrameworkFactory.builder()
.connectString(remoteCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(
new PotentiallyGzippedCompressionProvider(
false
)
)
.build();
remoteCf.start();
ObjectMapper jsonMapper = new DefaultObjectMapper();
DruidClusterBridgeConfig config = new DruidClusterBridgeConfig()
{
@Override
public String getTier()
{
return DruidServer.DEFAULT_TIER;
}
@Override
public Duration getStartDelay()
{
return new Duration(0);
}
@Override
public Duration getPeriod()
{
return new Duration(Long.MAX_VALUE);
}
@Override
public String getBrokerServiceName()
{
return "testz0rz";
}
@Override
public int getPriority()
{
return 0;
}
};
ScheduledExecutorFactory factory = ScheduledExecutors.createFactory(new Lifecycle());
DruidNode me = new DruidNode(
"me",
"localhost",
8080
);
AtomicReference<LeaderLatch> leaderLatch = new AtomicReference<>(new LeaderLatch(localCf, "test"));
ZkPathsConfig zkPathsConfig = new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return "/druid";
}
};
DruidServerMetadata metadata = new DruidServerMetadata(
"test",
"localhost",
1000,
"bridge",
DruidServer.DEFAULT_TIER,
0
);
DbSegmentPublisher dbSegmentPublisher = EasyMock.createMock(DbSegmentPublisher.class);
DatabaseSegmentManager databaseSegmentManager = EasyMock.createMock(DatabaseSegmentManager.class);
ServerView serverView = EasyMock.createMock(ServerView.class);
BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(
jsonMapper,
zkPathsConfig,
metadata,
remoteCf,
dbSegmentPublisher,
databaseSegmentManager,
serverView
);
Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor());
announcer.start();
announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHost(), jsonMapper.writeValueAsBytes(me));
BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class);
BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class);
EasyMock.expect(batchServerInventoryView.getInventory()).andReturn(
Arrays.asList(
new DruidServer("1", "localhost", 117, "historical", DruidServer.DEFAULT_TIER, 0),
new DruidServer("2", "localhost", 1, "historical", DruidServer.DEFAULT_TIER, 0)
)
);
batchServerInventoryView.registerSegmentCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject()
);
batchServerInventoryView.registerServerCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.ServerCallback>anyObject()
);
EasyMock.expectLastCall();
batchServerInventoryView.start();
EasyMock.expectLastCall();
EasyMock.replay(batchServerInventoryView);
DruidClusterBridge bridge = new DruidClusterBridge(
jsonMapper,
config,
factory,
me,
localCf,
leaderLatch,
bridgeZkCoordinator,
announcer,
batchDataSegmentAnnouncer,
batchServerInventoryView
);
bridge.start();
int retry = 0;
while (!bridge.isLeader()) {
if (retry > 5) {
throw new ISE("Unable to become leader");
}
Thread.sleep(100);
retry++;
}
String path = "/druid/announcements/localhost:8080";
retry = 0;
while (remoteCf.checkExists().forPath(path) == null) {
if (retry > 5) {
throw new ISE("Unable to announce");
}
Thread.sleep(100);
retry++;
}
DruidServerMetadata announced = jsonMapper.readValue(
remoteCf.getData().forPath(path),
DruidServerMetadata.class
);
Assert.assertEquals(118, announced.getMaxSize());
EasyMock.verify(batchServerInventoryView);
}
}

View File

@ -83,7 +83,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
MoreExecutors.sameThreadExecutor()
);
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal");
final DruidServerMetadata me = new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0);
final ZkPathsConfig zkPaths = new ZkPathsConfig()
{

View File

@ -93,7 +93,8 @@ public class BatchDataSegmentAnnouncerTest
"host",
Long.MAX_VALUE,
"type",
"tier"
"tier",
0
),
new BatchDataSegmentAnnouncerConfig()
{

View File

@ -29,6 +29,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.db.DatabaseRuleManager;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.timeline.DataSegment;
@ -53,7 +54,7 @@ public class DruidCoordinatorBalancerProfiler
Map<String, DataSegment> segments = Maps.newHashMap();
ServiceEmitter emitter;
DatabaseRuleManager manager;
PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"), 3, "normal");
PeriodLoadRule loadRule = new PeriodLoadRule(new Period("P5000Y"), null, 3, "normal");
List<Rule> rules = ImmutableList.<Rule>of(loadRule);
@Before

View File

@ -20,6 +20,7 @@
package io.druid.server.coordinator;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.helper.DruidCoordinatorBalancer;
import io.druid.timeline.DataSegment;
public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
@ -59,7 +60,7 @@ public class DruidCoordinatorBalancerTester extends DruidCoordinatorBalancer
loadPeon.loadSegment(segment.getSegment(), new LoadPeonCallback()
{
@Override
protected void execute()
public void execute()
{
}
});

View File

@ -31,6 +31,7 @@ import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.client.DruidServer;
import io.druid.db.DatabaseRuleManager;
import io.druid.segment.IndexIO;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.IntervalDropRule;
import io.druid.server.coordinator.rules.Rule;
@ -114,9 +115,9 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), null, 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "cold")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -132,7 +133,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -147,7 +149,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
),
mockPeon
)
@ -162,7 +165,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostCold",
1000,
"historical",
"cold"
"cold",
0
),
mockPeon
)
@ -210,8 +214,8 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), 2, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "cold")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T06:00:00.000Z"), null, 2, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "cold")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -227,7 +231,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
),
@ -237,7 +242,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot2",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -252,7 +258,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostCold",
1000,
"historical",
"cold"
"cold",
0
),
mockPeon
)
@ -299,8 +306,8 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -310,7 +317,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment availableSegment : availableSegments) {
normServer.addDataSegment(availableSegment.getIdentifier(), availableSegment);
@ -327,7 +335,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -382,8 +391,8 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -399,7 +408,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
),
mockPeon
)
@ -433,7 +443,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-02T00:00:00.000Z/2012-01-03T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -449,7 +459,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
),
mockPeon
)
@ -487,7 +498,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -498,7 +509,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server.addDataSegment(segment.getIdentifier(), segment);
@ -548,7 +560,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "normal"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "normal"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -559,7 +571,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm",
1000,
"historical",
"normal"
"normal",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
@ -568,7 +581,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
@ -625,7 +639,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -636,7 +650,8 @@ public class DruidCoordinatorRuleRunnerTest
"host1",
1000,
"historical",
"hot"
"hot",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
@ -644,7 +659,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
@ -704,7 +720,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), 1, "hot"),
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T12:00:00.000Z"), null, 1, "hot"),
new IntervalDropRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"))
)
).atLeastOnce();
@ -715,14 +731,16 @@ public class DruidCoordinatorRuleRunnerTest
"host1",
1000,
"historical",
"hot"
"hot",
0
);
DruidServer server2 = new DruidServer(
"serverNorm2",
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment segment : availableSegments) {
server2.addDataSegment(segment.getIdentifier(), segment);
@ -775,7 +793,7 @@ public class DruidCoordinatorRuleRunnerTest
{
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), 0, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2012-01-01T01:00:00.000Z"), null, 0, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -785,7 +803,8 @@ public class DruidCoordinatorRuleRunnerTest
"host1",
1000,
"historical",
"normal"
"normal",
0
);
server1.addDataSegment(availableSegments.get(0).getIdentifier(), availableSegments.get(0));
DruidServer server2 = new DruidServer(
@ -793,7 +812,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
server2.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
DruidServer server3 = new DruidServer(
@ -801,7 +821,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm3",
1000,
"historical",
"normal"
"normal",
0
);
server3.addDataSegment(availableSegments.get(1).getIdentifier(), availableSegments.get(1));
server3.addDataSegment(availableSegments.get(2).getIdentifier(), availableSegments.get(2));
@ -876,7 +897,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), 2, "hot")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-01T00:00:00.000Z"), null, 2, "hot")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -892,7 +913,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
),
@ -902,7 +924,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostHot2",
1000,
"historical",
"hot"
"hot",
0
),
mockPeon
)
@ -967,7 +990,7 @@ public class DruidCoordinatorRuleRunnerTest
EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.<String>anyObject())).andReturn(
Lists.<Rule>newArrayList(
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), 1, "normal")
new IntervalLoadRule(new Interval("2012-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z"), null, 1, "normal")
)
).atLeastOnce();
EasyMock.replay(databaseRuleManager);
@ -991,7 +1014,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm1",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server1.addDataSegment(availableSegment.getIdentifier(), availableSegment);
@ -1001,7 +1025,8 @@ public class DruidCoordinatorRuleRunnerTest
"hostNorm2",
1000,
"historical",
"normal"
"normal",
0
);
for (DataSegment availableSegment : longerAvailableSegments) {
server2.addDataSegment(availableSegment.getIdentifier(), availableSegment);

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import junit.framework.Assert;

View File

@ -0,0 +1,274 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server.coordinator.rules;
import com.google.api.client.util.Lists;
import com.google.api.client.util.Maps;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.client.DruidServer;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.LoadPeonCallback;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.server.coordinator.ReplicationThrottler;
import io.druid.server.coordinator.SegmentReplicantLookup;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
/**
*/
public class LoadRuleTest
{
private LoadQueuePeon mockPeon;
private ReplicationThrottler throttler;
private DataSegment segment;
@Before
public void setUp() throws Exception
{
mockPeon = EasyMock.createMock(LoadQueuePeon.class);
throttler = new ReplicationThrottler(1, 1);
for (String tier : Arrays.asList("hot", DruidServer.DEFAULT_TIER)) {
throttler.updateReplicationState(tier);
throttler.updateTerminationState(tier);
}
segment = new DataSegment(
"foo",
new Interval("0/3000"),
new DateTime().toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0
);
}
@After
public void tearDown() throws Exception
{
EasyMock.verify(mockPeon);
}
@Test
public void testLoad() throws Exception
{
mockPeon.loadSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).atLeastOnce();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
"hot", 1,
DruidServer.DEFAULT_TIER, 2
);
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
};
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot",
0
),
mockPeon
)
)
),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER,
0
),
mockPeon
)
)
)
)
);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
.withReplicationManager(throttler)
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2);
}
@Test
public void testDrop() throws Exception
{
mockPeon.dropSegment(EasyMock.<DataSegment>anyObject(), EasyMock.<LoadPeonCallback>anyObject());
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(Sets.<DataSegment>newHashSet()).atLeastOnce();
EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(0L).anyTimes();
EasyMock.replay(mockPeon);
LoadRule rule = new LoadRule()
{
private final Map<String, Integer> tiers = ImmutableMap.of(
"hot", 0,
DruidServer.DEFAULT_TIER, 0
);
@Override
public Map<String, Integer> getTieredReplicants()
{
return tiers;
}
@Override
public int getNumReplicants(String tier)
{
return tiers.get(tier);
}
@Override
public String getType()
{
return "test";
}
@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}
};
DruidServer server1 = new DruidServer(
"serverHot",
"hostHot",
1000,
"historical",
"hot",
0
);
server1.addDataSegment(segment.getIdentifier(), segment);
DruidServer server2 = new DruidServer(
"serverNorm",
"hostNorm",
1000,
"historical",
DruidServer.DEFAULT_TIER,
0
);
server2.addDataSegment(segment.getIdentifier(), segment);
DruidCluster druidCluster = new DruidCluster(
ImmutableMap.of(
"hot",
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server1,
mockPeon
)
)
),
DruidServer.DEFAULT_TIER,
MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create(
Arrays.asList(
new ServerHolder(
server2,
mockPeon
)
)
)
)
);
CoordinatorStats stats = rule.run(
null,
DruidCoordinatorRuntimeParams.newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withReplicationManager(throttler)
.withAvailableSegments(Arrays.asList(segment)).build(),
segment
);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1);
Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1);
}
}

View File

@ -42,6 +42,7 @@ public class PeriodLoadRuleTest
DateTime now = new DateTime("2013-01-01");
PeriodLoadRule rule = new PeriodLoadRule(
new Period("P5000Y"),
null,
0,
""
);
@ -57,6 +58,7 @@ public class PeriodLoadRuleTest
DateTime now = new DateTime("2012-12-31T01:00:00");
PeriodLoadRule rule = new PeriodLoadRule(
new Period("P1M"),
null,
0,
""
);

View File

@ -0,0 +1,178 @@
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.concurrent.Execs;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.db.DatabaseSegmentManager;
import io.druid.db.DatabaseSegmentManagerConfig;
import io.druid.db.DatabaseSegmentManagerProvider;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.ManageLifecycleLast;
import io.druid.guice.NodeTypeConfig;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.QueryResource;
import io.druid.server.bridge.Bridge;
import io.druid.server.bridge.BridgeCuratorConfig;
import io.druid.server.bridge.BridgeQuerySegmentWalker;
import io.druid.server.bridge.BridgeZkCoordinator;
import io.druid.server.bridge.DruidClusterBridge;
import io.druid.server.bridge.DruidClusterBridgeConfig;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.JettyServerInitializer;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.eclipse.jetty.server.Server;
import java.util.List;
/**
*/
@Command(
name = "bridge",
description = "This is a highly experimental node to use at your own discretion"
)
public class CliBridge extends ServerRunnable
{
private static final Logger log = new Logger(CliBridge.class);
public CliBridge()
{
super(log);
}
@Override
protected List<Object> getModules()
{
return ImmutableList.<Object>of(
new Module()
{
@Override
public void configure(Binder binder)
{
ConfigProvider.bind(binder, BridgeCuratorConfig.class);
binder.bind(BridgeZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("bridge"));
JsonConfigProvider.bind(binder, "druid.manager.segments", DatabaseSegmentManagerConfig.class);
binder.bind(DatabaseSegmentManager.class)
.toProvider(DatabaseSegmentManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(BridgeQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
LifecycleModule.register(binder, QueryResource.class);
ConfigProvider.bind(binder, DruidClusterBridgeConfig.class);
binder.bind(DruidClusterBridge.class);
LifecycleModule.register(binder, DruidClusterBridge.class);
LifecycleModule.register(binder, BridgeZkCoordinator.class);
LifecycleModule.register(binder, Server.class);
}
@Provides
@LazySingleton
@Bridge
public CuratorFramework getBridgeCurator(final BridgeCuratorConfig bridgeCuratorConfig, Lifecycle lifecycle)
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(bridgeCuratorConfig.getParentZkHosts())
.sessionTimeoutMs(bridgeCuratorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
.compressionProvider(
new PotentiallyGzippedCompressionProvider(
bridgeCuratorConfig.enableCompression()
)
)
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Curator for %s", bridgeCuratorConfig.getParentZkHosts());
framework.start();
}
@Override
public void stop()
{
log.info("Stopping Curator");
framework.close();
}
}
);
return framework;
}
@Provides
@ManageLifecycle
public ServerDiscoverySelector getServerDiscoverySelector(
DruidClusterBridgeConfig config,
ServerDiscoveryFactory factory
)
{
return factory.createSelector(config.getBrokerServiceName());
}
@Provides
@ManageLifecycle
@Bridge
public Announcer getBridgeAnnouncer(
@Bridge CuratorFramework curator
)
{
return new Announcer(curator, Execs.singleThreaded("BridgeAnnouncer-%s"));
}
@Provides
@ManageLifecycleLast
@Bridge
public AbstractDataSegmentAnnouncer getBridgeDataSegmentAnnouncer(
DruidServerMetadata metadata,
BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPathsConfig,
@Bridge Announcer announcer,
ObjectMapper jsonMapper
)
{
return new BatchDataSegmentAnnouncer(
metadata,
config,
zkPathsConfig,
announcer,
jsonMapper
);
}
}
);
}
}

View File

@ -51,7 +51,8 @@ public class Main
.withDefaultCommand(Help.class)
.withCommands(
CliCoordinator.class, CliHistorical.class, CliBroker.class,
CliRealtime.class, CliOverlord.class, CliMiddleManager.class
CliRealtime.class, CliOverlord.class, CliMiddleManager.class,
CliBridge.class
);
builder.withGroup("example")