update internal-discovery Listener for node list and use same at router and coordinator (#4697)

* update LookupCoordinatorManager to use internal discovery to discover lookup nodes

* router:use internal-discovery to discover brokers

* minor [Curator]DruidDiscoveryProvider refactoring

* add initialized() method to DruidNodeDiscovery.Listener

* update HttpServerInventoryView to use initialized() and call segment callback initialization asynchronously

* Revert "update HttpServerInventoryView to use initialized() and call segment callback initialization asynchronously"

This reverts commit f796e441221fe8b0e9df87fdec6c9f47bcedd890.

* Revert "add initialized() method to DruidNodeDiscovery.Listener"

This reverts commit f0661541d073683f28fce2dd4f30ec37db90deb0.

* minor refactoring removing synchronized from DruidNodeDiscoveryProvider

* updated DruidNodeDiscovery.Listener contract to take List of nodes and first call marks initialization

* update HttpServerInventoryView to handle new contract and handle initialization

* router update to handle updated listener contract

* document DruidNodeDiscovery.Listener contract

* fix forbidden-api error

* change log level to info for unknown path children cache events in CuratorDruidNodeDiscoveryProvider

* announce broker only after segment inventory is initialized
This commit is contained in:
Himanshu 2017-08-25 11:28:15 -05:00 committed by cheddar
parent 4f61dc66a9
commit 74538c3288
23 changed files with 876 additions and 774 deletions

View File

@ -68,6 +68,7 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -104,10 +105,10 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private volatile ExecutorService executor; private volatile ExecutorService executor;
// a queue of queryable server names for which worker threads in executor initiate the segment list call i.e. // the work queue, all items in this are sequentially processed by main thread setup in start()
// DruidServerHolder.updateSegmentsListAsync(..) which updates the segment list asynchronously and adds itself // used to call inventoryInitialized on all SegmentCallbacks and
// to this queue again for next update. // for keeping segment list for each queryable server uptodate.
private final BlockingQueue<String> queue = new LinkedBlockingDeque<>(); private final BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>();
private final HttpClient httpClient; private final HttpClient httpClient;
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
@ -161,10 +162,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try { try {
DruidServerHolder holder = servers.get(queue.take()); queue.take().run();
if (holder != null) {
holder.updateSegmentsListAsync();
}
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
log.info("main thread interrupted, served segments list is not synced anymore."); log.info("main thread interrupted, served segments list is not synced anymore.");
@ -184,17 +182,27 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
druidNodeDiscovery.registerListener( druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener() new DruidNodeDiscovery.Listener()
{ {
private volatile boolean initialized = false;
@Override @Override
public void nodeAdded(DiscoveryDruidNode node) public void nodesAdded(List<DiscoveryDruidNode> nodes)
{ {
serverAddedOrUpdated(toDruidServer(node)); nodes.forEach(
node -> serverAddedOrUpdated(toDruidServer(node))
);
if (!initialized) {
initialized = true;
queue.add(HttpServerInventoryView.this::serverInventoryInitialized);
}
} }
@Override @Override
public void nodeRemoved(DiscoveryDruidNode node) public void nodesRemoved(List<DiscoveryDruidNode> nodes)
{ {
serverRemoved(toDruidServer(node)); nodes.forEach(
node -> serverRemoved(toDruidServer(node))
);
} }
private DruidServer toDruidServer(DiscoveryDruidNode node) private DruidServer toDruidServer(DiscoveryDruidNode node)
@ -572,7 +580,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
log.error(ex, "error processing segment list response from server [%s]", druidServer.getName()); log.error(ex, "error processing segment list response from server [%s]", druidServer.getName());
} }
finally { finally {
queue.add(druidServer.getName()); addNextSyncToWorkQueue(druidServer.getName());
} }
} }
@ -611,7 +619,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
} }
} }
finally { finally {
queue.add(druidServer.getName()); addNextSyncToWorkQueue(druidServer.getName());
} }
} }
}, },
@ -621,7 +629,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
return future; return future;
} }
catch (Throwable th) { catch (Throwable th) {
queue.add(druidServer.getName()); addNextSyncToWorkQueue(druidServer.getName());
String logMsg = StringUtils.nonStrictFormat( String logMsg = StringUtils.nonStrictFormat(
"Fatal error while fetching segment list from server [%s].", druidServer.getName() "Fatal error while fetching segment list from server [%s].", druidServer.getName()
@ -646,6 +654,18 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
} }
} }
private void addNextSyncToWorkQueue(final String serverId)
{
queue.add(
() -> {
DruidServerHolder holder = servers.get(serverId);
if (holder != null) {
holder.updateSegmentsListAsync();
}
}
);
}
private boolean hasUnstabilityTimeoutPassed() private boolean hasUnstabilityTimeoutPassed()
{ {
if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) { if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) {

View File

@ -1,33 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client.selector;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.java.util.common.Pair;
import io.druid.query.Query;
/**
*/
public interface HostSelector<T>
{
public String getDefaultServiceName();
public Pair<String, ServerDiscoverySelector> select(Query<T> query);
}

View File

@ -22,6 +22,7 @@ package io.druid.curator.discovery;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.concurrent.LifecycleLock; import io.druid.concurrent.LifecycleLock;
@ -167,6 +168,8 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final Object lock = new Object(); private final Object lock = new Object();
private boolean cacheInitialized = false;
NodeTypeWatcher( NodeTypeWatcher(
ExecutorService listenerExecutor, ExecutorService listenerExecutor,
CuratorFramework curatorFramework, CuratorFramework curatorFramework,
@ -201,21 +204,15 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
public void registerListener(DruidNodeDiscovery.Listener listener) public void registerListener(DruidNodeDiscovery.Listener listener)
{ {
synchronized (lock) { synchronized (lock) {
for (DiscoveryDruidNode node : nodes.values()) { if (cacheInitialized) {
listenerExecutor.submit(() -> { ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
try { safeSchedule(
listener.nodeAdded(node); () -> {
} listener.nodesAdded(currNodes);
catch (Exception ex) { },
log.error( "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener
ex,
"Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].",
node,
listener
); );
} }
});
}
nodeListeners.add(listener); nodeListeners.add(listener);
} }
} }
@ -280,8 +277,30 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
break; break;
} }
case INITIALIZED: {
if (cacheInitialized) {
log.warn("cache is already initialized. ignoring [%s] event, nodeType [%s].", event.getType(), nodeType);
return;
}
log.info("Received INITIALIZED in node watcher for type [%s].", nodeType);
cacheInitialized = true;
ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
for (Listener l : nodeListeners) {
safeSchedule(
() -> {
l.nodesAdded(currNodes);
},
"Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, l
);
}
break;
}
default: { default: {
log.error("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType); log.info("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType);
} }
} }
} }
@ -291,35 +310,43 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
} }
} }
private void addNode(DiscoveryDruidNode druidNode) private void safeSchedule(
Runnable runnable,
String errMsgFormat, Object... args
)
{ {
synchronized (lock) {
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
for (DruidNodeDiscovery.Listener l : nodeListeners) {
listenerExecutor.submit(() -> { listenerExecutor.submit(() -> {
try { try {
l.nodeAdded(druidNode); runnable.run();
} }
catch (Exception ex) { catch (Exception ex) {
log.error( log.error(errMsgFormat, args);
ex,
"Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].",
druidNode,
l
);
} }
}); });
} }
private void addNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
if (prev == null) {
if (cacheInitialized) {
List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
for (Listener l : nodeListeners) {
safeSchedule(
() -> {
l.nodesAdded(newNode);
},
"Exception occured in nodeAdded(node=[%s]) in listener [%s].", druidNode, l
);
}
}
} else { } else {
log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev); log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev);
} }
} }
}
private void removeNode(DiscoveryDruidNode druidNode) private void removeNode(DiscoveryDruidNode druidNode)
{ {
synchronized (lock) {
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse()); DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
if (prev == null) { if (prev == null) {
@ -327,21 +354,16 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
return; return;
} }
for (DruidNodeDiscovery.Listener l : nodeListeners) { if (cacheInitialized) {
listenerExecutor.submit(() -> { List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
try { for (Listener l : nodeListeners) {
l.nodeRemoved(druidNode); safeSchedule(
} () -> {
catch (Exception ex) { l.nodesRemoved(nodeRemoved);
log.error( },
ex, "Exception occured in nodeRemoved(node=[%s]) in listener [%s].", druidNode, l
"Exception occured in DiscoveryDruidNode.nodeRemoved(node=[%s]) in listener [%s].",
druidNode,
l
); );
} }
});
}
} }
} }
@ -351,7 +373,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
cache.getListenable().addListener( cache.getListenable().addListener(
(client, event) -> handleChildEvent(client, event) (client, event) -> handleChildEvent(client, event)
); );
cache.start(); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
} }
catch (Exception ex) { catch (Exception ex) {
throw Throwables.propagate(ex); throw Throwables.propagate(ex);

View File

@ -20,6 +20,7 @@
package io.druid.discovery; package io.druid.discovery;
import java.util.Collection; import java.util.Collection;
import java.util.List;
/** /**
* Interface for discovering Druid Nodes announced by DruidNodeAnnouncer. * Interface for discovering Druid Nodes announced by DruidNodeAnnouncer.
@ -29,9 +30,23 @@ public interface DruidNodeDiscovery
Collection<DiscoveryDruidNode> getAllNodes(); Collection<DiscoveryDruidNode> getAllNodes();
void registerListener(Listener listener); void registerListener(Listener listener);
/**
* Listener for watching nodes in a DruidNodeDiscovery instance obtained via DruidNodeDiscoveryProvider.getXXX().
* DruidNodeDiscovery implementation should assume that Listener is not threadsafe and never call methods in
* Listener concurrently.
*
* Implementation of Listener must ensure to not do any time consuming work or block in any of the methods.
*/
interface Listener interface Listener
{ {
void nodeAdded(DiscoveryDruidNode node); /**
void nodeRemoved(DiscoveryDruidNode node); * List of nodes added.
* First call to this method is also a signal that underlying cache in the DruidNodeDiscovery implementation
* has been initialized.
* @param nodes
*/
void nodesAdded(List<DiscoveryDruidNode> nodes);
void nodesRemoved(List<DiscoveryDruidNode> nodes);
} }
} }

View File

@ -19,6 +19,7 @@
package io.druid.discovery; package io.druid.discovery;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
@ -27,6 +28,7 @@ import io.druid.java.util.common.logger.Logger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -63,7 +65,8 @@ public abstract class DruidNodeDiscoveryProvider
WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM) WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM)
); );
private final Map<String, ServiceListener> serviceDiscoveryMap = new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size()); private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery> serviceDiscoveryMap = new ConcurrentHashMap<>(
SERVICE_TO_NODE_TYPES.size());
/** /**
* Get DruidNodeDiscovery instance to discover nodes of given nodeType. * Get DruidNodeDiscovery instance to discover nodes of given nodeType.
@ -73,69 +76,47 @@ public abstract class DruidNodeDiscoveryProvider
/** /**
* Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata. * Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata.
*/ */
public synchronized DruidNodeDiscovery getForService(String serviceName) public DruidNodeDiscovery getForService(String serviceName)
{ {
ServiceListener nodeDiscovery = serviceDiscoveryMap.get(serviceName); return serviceDiscoveryMap.compute(
serviceName,
(k, v) -> {
if (v != null) {
return v;
}
if (nodeDiscovery == null) {
Set<String> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName); Set<String> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName);
if (nodeTypesToWatch == null) { if (nodeTypesToWatch == null) {
throw new IAE("Unknown service [%s].", serviceName); throw new IAE("Unknown service [%s].", serviceName);
} }
nodeDiscovery = new ServiceListener(serviceName); ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(serviceName);
for (String nodeType : nodeTypesToWatch) { for (String nodeType : nodeTypesToWatch) {
getForNodeType(nodeType).registerListener(nodeDiscovery); getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener());
} }
serviceDiscoveryMap.put(serviceName, nodeDiscovery); return serviceDiscovery;
}
);
} }
return nodeDiscovery; private static class ServiceDruidNodeDiscovery implements DruidNodeDiscovery
}
private static class ServiceListener implements DruidNodeDiscovery, DruidNodeDiscovery.Listener
{ {
private static final Logger log = new Logger(ServiceDruidNodeDiscovery.class);
private final String service; private final String service;
private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>(); private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
private final List<Listener> listeners = new ArrayList<>(); private final List<Listener> listeners = new ArrayList<>();
ServiceListener(String service) private final Object lock = new Object();
private Set<NodeTypeListener> uninitializedNodeTypeListeners = new HashSet<>();
ServiceDruidNodeDiscovery(String service)
{ {
this.service = service; this.service = service;
} }
@Override
public synchronized void nodeAdded(DiscoveryDruidNode node)
{
if (node.getServices().containsKey(service)) {
DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node);
if (prev == null) {
for (Listener listener : listeners) {
listener.nodeAdded(node);
}
} else {
log.warn("Node[%s] discovered but already exists [%s].", node, prev);
}
} else {
log.warn("Node[%s] discovered but doesn't have service[%s]. Ignored.", node, service);
}
}
@Override
public synchronized void nodeRemoved(DiscoveryDruidNode node)
{
DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse());
if (prev != null) {
for (Listener listener : listeners) {
listener.nodeRemoved(node);
}
} else {
log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service);
}
}
@Override @Override
public Collection<DiscoveryDruidNode> getAllNodes() public Collection<DiscoveryDruidNode> getAllNodes()
{ {
@ -143,12 +124,91 @@ public abstract class DruidNodeDiscoveryProvider
} }
@Override @Override
public synchronized void registerListener(Listener listener) public void registerListener(Listener listener)
{ {
for (DiscoveryDruidNode node : nodes.values()) { synchronized (lock) {
listener.nodeAdded(node); if (uninitializedNodeTypeListeners.isEmpty()) {
listener.nodesAdded(ImmutableList.copyOf(nodes.values()));
} }
listeners.add(listener); listeners.add(listener);
} }
} }
NodeTypeListener nodeTypeListener()
{
NodeTypeListener nodeListener = new NodeTypeListener();
uninitializedNodeTypeListeners.add(nodeListener);
return nodeListener;
}
class NodeTypeListener implements DruidNodeDiscovery.Listener
{
@Override
public void nodesAdded(List<DiscoveryDruidNode> nodesDiscovered)
{
synchronized (lock) {
ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
for (DiscoveryDruidNode node : nodesDiscovered) {
if (node.getServices().containsKey(service)) {
DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node);
if (prev == null) {
builder.add(node);
} else {
log.warn("Node[%s] discovered but already exists [%s].", node, prev);
}
} else {
log.warn("Node[%s] discovered but doesn't have service[%s]. Ignored.", node, service);
}
}
ImmutableList<DiscoveryDruidNode> newNodesAdded = null;
if (uninitializedNodeTypeListeners.isEmpty()) {
newNodesAdded = builder.build();
} else if (uninitializedNodeTypeListeners.remove(this) && uninitializedNodeTypeListeners.isEmpty()) {
newNodesAdded = ImmutableList.copyOf(nodes.values());
}
if (newNodesAdded != null) {
for (Listener listener : listeners) {
try {
listener.nodesAdded(newNodesAdded);
}
catch (Exception ex) {
log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, newNodesAdded);
}
}
}
}
}
@Override
public void nodesRemoved(List<DiscoveryDruidNode> nodesDisappeared)
{
synchronized (lock) {
ImmutableList.Builder<DiscoveryDruidNode> builder = ImmutableList.builder();
for (DiscoveryDruidNode node : nodesDisappeared) {
DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse());
if (prev != null) {
builder.add(node);
} else {
log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service);
}
}
if (uninitializedNodeTypeListeners.isEmpty()) {
ImmutableList<DiscoveryDruidNode> nodesRemoved = builder.build();
for (Listener listener : listeners) {
try {
listener.nodesRemoved(nodesRemoved);
}
catch (Exception ex) {
log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved);
}
}
}
}
}
}
}
} }

View File

@ -213,6 +213,15 @@ public class DruidNode
return null; return null;
} }
public int getPortToUse()
{
if (serverConfig.isTls()) {
return getTlsPort();
} else {
return getPlaintextPort();
}
}
public String getHostAndPortToUse() public String getHostAndPortToUse()
{ {
return getHostAndTlsPort() != null ? getHostAndTlsPort() : getHostAndPort(); return getHostAndTlsPort() != null ? getHostAndTlsPort() : getHostAndPort();

View File

@ -20,12 +20,16 @@
package io.druid.server.coordination.broker; package io.druid.server.coordination.broker;
import com.google.common.base.Predicates; import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.client.FilteredServerInventoryView; import io.druid.client.FilteredServerInventoryView;
import io.druid.client.ServerView; import io.druid.client.ServerView;
import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeAnnouncer;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.ManageLifecycle; import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self; import io.druid.guice.annotations.Self;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
@ -40,17 +44,28 @@ public class DruidBroker
{ {
private final DruidNode self; private final DruidNode self;
private final ServiceAnnouncer serviceAnnouncer; private final ServiceAnnouncer serviceAnnouncer;
private final DruidNodeAnnouncer druidNodeAnnouncer;
private final DiscoveryDruidNode discoveryDruidNode;
private volatile boolean started = false; private volatile boolean started = false;
@Inject @Inject
public DruidBroker( public DruidBroker(
final FilteredServerInventoryView serverInventoryView, final FilteredServerInventoryView serverInventoryView,
final @Self DruidNode self, final @Self DruidNode self,
final ServiceAnnouncer serviceAnnouncer final ServiceAnnouncer serviceAnnouncer,
final DruidNodeAnnouncer druidNodeAnnouncer,
final LookupNodeService lookupNodeService
) )
{ {
this.self = self; this.self = self;
this.serviceAnnouncer = serviceAnnouncer; this.serviceAnnouncer = serviceAnnouncer;
this.druidNodeAnnouncer = druidNodeAnnouncer;
this.discoveryDruidNode = new DiscoveryDruidNode(
self,
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableMap.of(lookupNodeService.getName(), lookupNodeService)
);
serverInventoryView.registerSegmentCallback( serverInventoryView.registerSegmentCallback(
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
@ -60,6 +75,7 @@ public class DruidBroker
public ServerView.CallbackAction segmentViewInitialized() public ServerView.CallbackAction segmentViewInitialized()
{ {
serviceAnnouncer.announce(self); serviceAnnouncer.announce(self);
druidNodeAnnouncer.announce(discoveryDruidNode);
return ServerView.CallbackAction.UNREGISTER; return ServerView.CallbackAction.UNREGISTER;
} }
}, },
@ -87,6 +103,7 @@ public class DruidBroker
return; return;
} }
serviceAnnouncer.unannounce(self); serviceAnnouncer.unannounce(self);
druidNodeAnnouncer.unannounce(discoveryDruidNode);
started = false; started = false;
} }
} }

View File

@ -20,7 +20,7 @@
package io.druid.server.http; package io.druid.server.http;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.client.selector.Server;
import io.druid.server.router.TieredBrokerHostSelector; import io.druid.server.router.TieredBrokerHostSelector;
import javax.ws.rs.GET; import javax.ws.rs.GET;
@ -50,12 +50,12 @@ public class RouterResource
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public Map<String, List<String>> getBrokers() public Map<String, List<String>> getBrokers()
{ {
Map<String, ServerDiscoverySelector> brokerSelectorMap = tieredBrokerHostSelector.getAllBrokers(); Map<String, List<Server>> brokerSelectorMap = tieredBrokerHostSelector.getAllBrokers();
Map<String, List<String>> brokersMap = new HashMap<>(brokerSelectorMap.size()); Map<String, List<String>> brokersMap = new HashMap<>(brokerSelectorMap.size());
for (Map.Entry<String, ServerDiscoverySelector> e : brokerSelectorMap.entrySet()) { for (Map.Entry<String, List<Server>> e : brokerSelectorMap.entrySet()) {
brokersMap.put(e.getKey(), e.getValue().getAll().stream().map(s -> s.getHost()).collect(Collectors.toList())); brokersMap.put(e.getKey(), e.getValue().stream().map(s -> s.getHost()).collect(Collectors.toList()));
} }
return brokersMap; return brokersMap;

View File

@ -1,200 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.announcer;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.http.HostAndPortWithScheme;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class ListenerDiscoverer
{
private static final Logger LOG = new Logger(ListenerDiscoverer.class);
private volatile Map<HostAndPortWithScheme, Long> lastSeenMap = ImmutableMap.of();
private final CuratorFramework cf;
private final ListeningAnnouncerConfig listeningAnnouncerConfig;
private final Object startStopSync = new Object();
private volatile boolean started = false;
@Inject
public ListenerDiscoverer(
CuratorFramework cf,
ListeningAnnouncerConfig listeningAnnouncerConfig
)
{
this.cf = cf;
this.listeningAnnouncerConfig = listeningAnnouncerConfig;
}
@LifecycleStart
public void start()
{
synchronized (startStopSync) {
if (started) {
LOG.debug("Already started");
return;
}
started = true;
LOG.info("Started");
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopSync) {
if (!started) {
LOG.debug("Already stopped");
return;
}
LOG.info("Stopped");
started = false;
}
}
/**
* Get nodes at a particular listener.
* This method lazily adds service discovery
*
* @param listener_key The Listener's service key
*
* @return A collection of druid nodes as established by the service discovery
*
* @throws IOException if there was an error refreshing the zookeeper cache
*/
public Collection<HostAndPortWithScheme> getNodes(final String listener_key) throws IOException
{
return getCurrentNodes(listener_key).keySet();
}
Map<HostAndPortWithScheme, Long> getCurrentNodes(final String listener_key) throws IOException
{
final HashMap<HostAndPortWithScheme, Long> retVal = new HashMap<>();
final String zkPath = listeningAnnouncerConfig.getAnnouncementPath(listener_key);
final Collection<String> children;
try {
children = cf.getChildren().forPath(zkPath);
}
catch (KeeperException.NoNodeException e) {
LOG.debug(e, "No path found at [%s]", zkPath);
return ImmutableMap.of();
}
catch (Exception e) {
throw new IOException("Error getting children for " + zkPath, e);
}
for (String child : children) {
final String childPath = ZKPaths.makePath(zkPath, child);
try {
final byte[] data;
try {
data = cf.getData().decompressed().forPath(childPath);
}
catch (Exception e) {
throw new IOException("Error getting data for " + childPath, e);
}
if (data == null) {
LOG.debug("Lost data at path [%s]", childPath);
continue;
}
final HostAndPortWithScheme hostAndPortWithScheme = HostAndPortWithScheme.fromString(child);
final Long l = ByteBuffer.wrap(data).getLong();
retVal.put(hostAndPortWithScheme, l);
}
catch (IllegalArgumentException iae) {
LOG.warn(iae, "Error parsing [%s]", childPath);
}
}
return ImmutableMap.copyOf(retVal);
}
/**
* Get only nodes that are new since the last time getNewNodes was called (or all nodes if it has never been called)
*
* @param listener_key The listener key to look for
*
* @return A collection of nodes that are new
*
* @throws IOException If there was an error in refreshing the Zookeeper cache
*/
public synchronized Collection<HostAndPortWithScheme> getNewNodes(final String listener_key) throws IOException
{
final Map<HostAndPortWithScheme, Long> priorSeenMap = lastSeenMap;
final Map<HostAndPortWithScheme, Long> currentMap = getCurrentNodes(listener_key);
final Collection<HostAndPortWithScheme> retVal = Collections2.filter(
currentMap.keySet(),
new Predicate<HostAndPortWithScheme>()
{
@Override
public boolean apply(HostAndPortWithScheme input)
{
final Long l = priorSeenMap.get(input);
return l == null || l < currentMap.get(input);
}
}
);
lastSeenMap = currentMap;
return retVal;
}
/**
* Discovers children of the listener key
*
* @param key_base The base of the listener key, or null or empty string to get all immediate children of the listener path
*
* @return A collection of the names of the children, or empty list on NoNodeException from Curator
*
* @throws IOException from Curator
* @throws RuntimeException for other exceptions from Curator.
*/
public Collection<String> discoverChildren(@Nullable final String key_base) throws IOException
{
final String zkPath = Strings.isNullOrEmpty(key_base)
? listeningAnnouncerConfig.getListenersPath()
: listeningAnnouncerConfig.getAnnouncementPath(key_base);
try {
return cf.getChildren().forPath(zkPath);
}
catch (KeeperException.NoNodeException | KeeperException.NoChildrenForEphemeralsException e) {
LOG.warn(e, "Path [%s] not discoverable", zkPath);
return ImmutableList.of();
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
throw Throwables.propagate(e);
}
}
}

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2; import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -47,6 +46,7 @@ import io.druid.audit.AuditInfo;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.concurrent.LifecycleLock; import io.druid.concurrent.LifecycleLock;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Smile; import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
@ -54,10 +54,8 @@ import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StreamUtils; import io.druid.java.util.common.StreamUtils;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import io.druid.query.lookup.LookupModule;
import io.druid.query.lookup.LookupsState; import io.druid.query.lookup.LookupsState;
import io.druid.server.http.HostAndPortWithScheme; import io.druid.server.http.HostAndPortWithScheme;
import io.druid.server.listener.announcer.ListenerDiscoverer;
import io.druid.server.listener.resource.ListenerResource; import io.druid.server.listener.resource.ListenerResource;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
@ -109,7 +107,9 @@ public class LookupCoordinatorManager
private static final EmittingLogger LOG = new EmittingLogger(LookupCoordinatorManager.class); private static final EmittingLogger LOG = new EmittingLogger(LookupCoordinatorManager.class);
private final ListenerDiscoverer listenerDiscoverer; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private LookupNodeDiscovery lookupNodeDiscovery;
private final JacksonConfigManager configManager; private final JacksonConfigManager configManager;
private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig; private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig;
private final LookupsCommunicator lookupsCommunicator; private final LookupsCommunicator lookupsCommunicator;
@ -134,32 +134,35 @@ public class LookupCoordinatorManager
@Inject @Inject
public LookupCoordinatorManager( public LookupCoordinatorManager(
final @Global HttpClient httpClient, final @Global HttpClient httpClient,
final ListenerDiscoverer listenerDiscoverer, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final @Smile ObjectMapper smileMapper, final @Smile ObjectMapper smileMapper,
final JacksonConfigManager configManager, final JacksonConfigManager configManager,
final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
) )
{ {
this( this(
listenerDiscoverer, druidNodeDiscoveryProvider,
configManager, configManager,
lookupCoordinatorManagerConfig, lookupCoordinatorManagerConfig,
new LookupsCommunicator(httpClient, lookupCoordinatorManagerConfig, smileMapper) new LookupsCommunicator(httpClient, lookupCoordinatorManagerConfig, smileMapper),
null
); );
} }
@VisibleForTesting @VisibleForTesting
LookupCoordinatorManager( LookupCoordinatorManager(
final ListenerDiscoverer listenerDiscoverer, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final JacksonConfigManager configManager, final JacksonConfigManager configManager,
final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig, final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig,
final LookupsCommunicator lookupsCommunicator final LookupsCommunicator lookupsCommunicator,
final LookupNodeDiscovery lookupNodeDiscovery
) )
{ {
this.listenerDiscoverer = listenerDiscoverer; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.configManager = configManager; this.configManager = configManager;
this.lookupCoordinatorManagerConfig = lookupCoordinatorManagerConfig; this.lookupCoordinatorManagerConfig = lookupCoordinatorManagerConfig;
this.lookupsCommunicator = lookupsCommunicator; this.lookupsCommunicator = lookupsCommunicator;
this.lookupNodeDiscovery = lookupNodeDiscovery;
} }
public boolean updateLookup( public boolean updateLookup(
@ -275,23 +278,17 @@ public class LookupCoordinatorManager
} }
} }
public Collection<String> discoverTiers() public Set<String> discoverTiers()
{ {
try {
Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started");
return listenerDiscoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY); return lookupNodeDiscovery.getAllTiers();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} }
public Collection<HostAndPort> discoverNodesInTier(String tier) public Collection<HostAndPort> discoverNodesInTier(String tier)
{ {
try {
Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started");
return Collections2.transform( return Collections2.transform(
listenerDiscoverer.getNodes(LookupModule.getTierListenerPath(tier)), lookupNodeDiscovery.getNodesInTier(tier),
new Function<HostAndPortWithScheme, HostAndPort>() new Function<HostAndPortWithScheme, HostAndPort>()
{ {
@Override @Override
@ -302,10 +299,6 @@ public class LookupCoordinatorManager
} }
); );
} }
catch (IOException e) {
throw new RuntimeException(e);
}
}
public Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> getLastKnownLookupsStateOnNodes() public Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> getLastKnownLookupsStateOnNodes()
{ {
@ -348,6 +341,10 @@ public class LookupCoordinatorManager
try { try {
LOG.debug("Starting."); LOG.debug("Starting.");
if (lookupNodeDiscovery == null) {
lookupNodeDiscovery = new LookupNodeDiscovery(druidNodeDiscoveryProvider);
}
//first ensure that previous executorService from last cycle of start/stop has finished completely. //first ensure that previous executorService from last cycle of start/stop has finished completely.
//so that we don't have multiple live executorService instances lying around doing lookup management. //so that we don't have multiple live executorService instances lying around doing lookup management.
if (executorService != null && if (executorService != null &&
@ -522,7 +519,7 @@ public class LookupCoordinatorManager
LOG.debug("Starting lookup mgmt for tier [%s].", tierEntry.getKey()); LOG.debug("Starting lookup mgmt for tier [%s].", tierEntry.getKey());
final Map<String, LookupExtractorFactoryMapContainer> tierLookups = tierEntry.getValue(); final Map<String, LookupExtractorFactoryMapContainer> tierLookups = tierEntry.getValue();
for (final HostAndPortWithScheme node : listenerDiscoverer.getNodes(LookupModule.getTierListenerPath(tierEntry.getKey()))) { for (final HostAndPortWithScheme node : lookupNodeDiscovery.getNodesInTier(tierEntry.getKey())) {
LOG.debug( LOG.debug(
"Starting lookup mgmt for tier [%s] and host [%s:%s:%s].", "Starting lookup mgmt for tier [%s] and host [%s:%s:%s].",

View File

@ -0,0 +1,88 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.lookup.cache;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.server.http.HostAndPortWithScheme;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Set;
/**
* A Helper class that uses DruidNodeDiscovery to discover lookup nodes and tiers.
*/
public class LookupNodeDiscovery
{
private final DruidNodeDiscovery druidNodeDiscovery;
LookupNodeDiscovery(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider)
{
this.druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(LookupNodeService.DISCOVERY_SERVICE_KEY);
}
public Collection<HostAndPortWithScheme> getNodesInTier(String tier)
{
return Collections2.transform(
Collections2.filter(
druidNodeDiscovery.getAllNodes(),
new Predicate<DiscoveryDruidNode>()
{
@Override
public boolean apply(@Nullable DiscoveryDruidNode node)
{
return tier.equals(((LookupNodeService) node.getServices()
.get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier());
}
}
),
new Function<DiscoveryDruidNode, HostAndPortWithScheme>()
{
@Override
public HostAndPortWithScheme apply(@Nullable DiscoveryDruidNode input)
{
return HostAndPortWithScheme.fromString(
input.getDruidNode().getServiceScheme(),
input.getDruidNode().getHostAndPortToUse()
);
}
}
);
}
public Set<String> getAllTiers()
{
ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<>();
druidNodeDiscovery.getAllNodes().stream().forEach(
node -> builder.add(((LookupNodeService) node.getServices()
.get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier())
);
return builder.build();
}
}

View File

@ -24,12 +24,12 @@ import com.google.common.collect.FluentIterable;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.query.Query; import io.druid.query.Query;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -52,27 +52,27 @@ public class QueryHostFinder
public <T> Server findServer(Query<T> query) public <T> Server findServer(Query<T> query)
{ {
final Pair<String, ServerDiscoverySelector> selected = hostSelector.select(query); final Pair<String, Server> selected = hostSelector.select(query);
return findServerInner(selected); return findServerInner(selected);
} }
public Server findDefaultServer() public Server findDefaultServer()
{ {
final Pair<String, ServerDiscoverySelector> selected = hostSelector.getDefaultLookup(); final Pair<String, Server> selected = hostSelector.getDefaultLookup();
return findServerInner(selected); return findServerInner(selected);
} }
public Collection<String> getAllHosts() public Collection<String> getAllHosts()
{ {
return FluentIterable return FluentIterable
.from((Collection<ServerDiscoverySelector>) hostSelector.getAllBrokers().values()) .from((Collection<List<Server>>) hostSelector.getAllBrokers().values())
.transformAndConcat( .transformAndConcat(
new Function<ServerDiscoverySelector, Iterable<Server>>() new Function<List<Server>, Iterable<Server>>()
{ {
@Override @Override
public Iterable<Server> apply(ServerDiscoverySelector input) public Iterable<Server> apply(List<Server> input)
{ {
return input.getAll(); return input;
} }
} }
).transform(new Function<Server, String>() ).transform(new Function<Server, String>()
@ -118,16 +118,15 @@ public class QueryHostFinder
return server.getHost(); return server.getHost();
} }
private Server findServerInner(final Pair<String, ServerDiscoverySelector> selected) private Server findServerInner(final Pair<String, Server> selected)
{ {
if (selected == null) { if (selected == null) {
log.error("Danger, Will Robinson! Unable to find any brokers!"); log.error("Danger, Will Robinson! Unable to find any brokers!");
} }
final String serviceName = selected == null ? hostSelector.getDefaultServiceName() : selected.lhs; final String serviceName = selected == null ? hostSelector.getDefaultServiceName() : selected.lhs;
final ServerDiscoverySelector selector = selected == null ? null : selected.rhs; Server server = selected == null ? null : selected.rhs;
Server server = selector == null ? null : selector.pick();
if (server == null) { if (server == null) {
log.error( log.error(
"WTF?! No server found for serviceName[%s]. Using backup", "WTF?! No server found for serviceName[%s]. Using backup",

View File

@ -19,14 +19,17 @@
package io.druid.server.router; package io.druid.server.router;
import com.google.common.base.Function;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.client.selector.HostSelector; import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.discovery.DiscoveryDruidNode;
import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStart;
@ -37,38 +40,75 @@ import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Collections; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
*/ */
public class TieredBrokerHostSelector<T> implements HostSelector<T> public class TieredBrokerHostSelector<T>
{ {
private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class); private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class);
private final CoordinatorRuleManager ruleManager; private final CoordinatorRuleManager ruleManager;
private final TieredBrokerConfig tierConfig; private final TieredBrokerConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
private final List<TieredBrokerSelectorStrategy> strategies; private final List<TieredBrokerSelectorStrategy> strategies;
// brokerService -> broker-nodes-holder
private final ConcurrentHashMap<String, NodesHolder> servers = new ConcurrentHashMap<>();
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final Object lock = new Object(); private final Object lock = new Object();
private volatile boolean started = false; private volatile boolean started = false;
private static final Function<DiscoveryDruidNode, Server> TO_SERVER = new Function<DiscoveryDruidNode, Server>()
{
@Override
public Server apply(final DiscoveryDruidNode instance)
{
return new Server()
{
@Override
public String getHost()
{
return instance.getDruidNode().getHostAndPortToUse();
}
@Override
public String getAddress()
{
return instance.getDruidNode().getHost();
}
@Override
public int getPort()
{
return instance.getDruidNode().getPortToUse();
}
@Override
public String getScheme()
{
return instance.getDruidNode().getServiceScheme();
}
};
}
};
@Inject @Inject
public TieredBrokerHostSelector( public TieredBrokerHostSelector(
CoordinatorRuleManager ruleManager, CoordinatorRuleManager ruleManager,
TieredBrokerConfig tierConfig, TieredBrokerConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory, DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
List<TieredBrokerSelectorStrategy> strategies List<TieredBrokerSelectorStrategy> strategies
) )
{ {
this.ruleManager = ruleManager; this.ruleManager = ruleManager;
this.tierConfig = tierConfig; this.tierConfig = tierConfig;
this.serverDiscoveryFactory = serverDiscoveryFactory; this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
this.strategies = strategies; this.strategies = strategies;
} }
@ -80,17 +120,42 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
return; return;
} }
try {
for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) { for (Map.Entry<String, String> entry : tierConfig.getTierToBrokerMap().entrySet()) {
ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue()); servers.put(entry.getValue(), new NodesHolder());
selector.start(); }
selectorMap.put(entry.getValue(), selector);
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
@Override
public void nodesAdded(List<DiscoveryDruidNode> nodes)
{
nodes.forEach(
(node) -> {
NodesHolder nodesHolder = servers.get(node.getDruidNode().getServiceName());
if (nodesHolder != null) {
nodesHolder.add(node.getDruidNode().getHostAndPortToUse(), TO_SERVER.apply(node));
} }
} }
catch (Exception e) { );
throw Throwables.propagate(e);
} }
@Override
public void nodesRemoved(List<DiscoveryDruidNode> nodes)
{
nodes.forEach(
(node) -> {
NodesHolder nodesHolder = servers.get(node.getDruidNode().getServiceName());
if (nodesHolder != null) {
nodesHolder.remove(node.getDruidNode().getHostAndPortToUse());
}
}
);
}
}
);
started = true; started = true;
} }
} }
@ -104,27 +169,16 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
return; return;
} }
try {
for (ServerDiscoverySelector selector : selectorMap.values()) {
selector.stop();
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
started = false; started = false;
} }
} }
@Override
public String getDefaultServiceName() public String getDefaultServiceName()
{ {
return tierConfig.getDefaultBrokerServiceName(); return tierConfig.getDefaultBrokerServiceName();
} }
@Override public Pair<String, Server> select(final Query<T> query)
public Pair<String, ServerDiscoverySelector> select(final Query<T> query)
{ {
synchronized (lock) { synchronized (lock) {
if (!ruleManager.isStarted() || !started) { if (!ruleManager.isStarted() || !started) {
@ -186,29 +240,83 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
brokerServiceName = tierConfig.getDefaultBrokerServiceName(); brokerServiceName = tierConfig.getDefaultBrokerServiceName();
} }
ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName); NodesHolder nodesHolder = servers.get(brokerServiceName);
if (retVal == null) { if (nodesHolder == null) {
log.error( log.error(
"WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]", "WTF?! No nodesHolder found for brokerServiceName[%s]. Using default selector for[%s]",
brokerServiceName, brokerServiceName,
tierConfig.getDefaultBrokerServiceName() tierConfig.getDefaultBrokerServiceName()
); );
retVal = selectorMap.get(tierConfig.getDefaultBrokerServiceName()); nodesHolder = servers.get(tierConfig.getDefaultBrokerServiceName());
} }
return new Pair<>(brokerServiceName, retVal); return new Pair<>(brokerServiceName, nodesHolder.pick());
} }
public Pair<String, ServerDiscoverySelector> getDefaultLookup() public Pair<String, Server> getDefaultLookup()
{ {
final String brokerServiceName = tierConfig.getDefaultBrokerServiceName(); final String brokerServiceName = tierConfig.getDefaultBrokerServiceName();
final ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName); return new Pair<>(brokerServiceName, servers.get(brokerServiceName).pick());
return new Pair<>(brokerServiceName, retVal);
} }
public Map<String, ServerDiscoverySelector> getAllBrokers() public Map<String, List<Server>> getAllBrokers()
{ {
return Collections.unmodifiableMap(selectorMap); return Maps.transformValues(
servers,
new Function<NodesHolder, List<Server>>()
{
@Override
public List<Server> apply(NodesHolder input)
{
return input.getAll();
}
}
);
}
private static class NodesHolder
{
private int roundRobinIndex = 0;
private Map<String, Server> nodesMap = new HashMap<>();
private ImmutableList<Server> nodes = ImmutableList.of();
void add(String id, Server node)
{
synchronized (this) {
nodesMap.put(id, node);
nodes = ImmutableList.copyOf(nodesMap.values());
}
}
void remove(String id)
{
synchronized (this) {
if (nodesMap.remove(id) != null) {
nodes = ImmutableList.copyOf(nodesMap.values());
}
}
}
List<Server> getAll()
{
return nodes;
}
Server pick()
{
ImmutableList<Server> currNodes = nodes;
if (currNodes.size() == 0) {
return null;
}
if (roundRobinIndex >= currNodes.size()) {
roundRobinIndex %= currNodes.size();
}
return currNodes.get(roundRobinIndex++);
}
} }
} }

View File

@ -39,6 +39,7 @@ import org.junit.Test;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -128,15 +129,15 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
new DruidNodeDiscovery.Listener() new DruidNodeDiscovery.Listener()
{ {
@Override @Override
public void nodeAdded(DiscoveryDruidNode node) public void nodesAdded(List<DiscoveryDruidNode> nodes)
{ {
coordNodes.add(node); coordNodes.addAll(nodes);
} }
@Override @Override
public void nodeRemoved(DiscoveryDruidNode node) public void nodesRemoved(List<DiscoveryDruidNode> nodes)
{ {
coordNodes.remove(node); coordNodes.removeAll(nodes);
} }
} }
); );
@ -146,15 +147,15 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
new DruidNodeDiscovery.Listener() new DruidNodeDiscovery.Listener()
{ {
@Override @Override
public void nodeAdded(DiscoveryDruidNode node) public void nodesAdded(List<DiscoveryDruidNode> nodes)
{ {
overlordNodes.add(node); overlordNodes.addAll(nodes);
} }
@Override @Override
public void nodeRemoved(DiscoveryDruidNode node) public void nodesRemoved(List<DiscoveryDruidNode> nodes)
{ {
overlordNodes.remove(node); overlordNodes.removeAll(nodes);
} }
} }
); );

View File

@ -19,6 +19,7 @@
package io.druid.discovery; package io.druid.discovery;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
@ -47,15 +48,15 @@ public class DruidNodeDiscoveryProviderTest
new DruidNodeDiscovery.Listener() new DruidNodeDiscovery.Listener()
{ {
@Override @Override
public void nodeAdded(DiscoveryDruidNode node) public void nodesAdded(List<DiscoveryDruidNode> nodes)
{ {
dataNodes.add(node); dataNodes.addAll(nodes);
} }
@Override @Override
public void nodeRemoved(DiscoveryDruidNode node) public void nodesRemoved(List<DiscoveryDruidNode> nodes)
{ {
dataNodes.remove(node); dataNodes.removeAll(nodes);
} }
} }
); );
@ -66,15 +67,15 @@ public class DruidNodeDiscoveryProviderTest
new DruidNodeDiscovery.Listener() new DruidNodeDiscovery.Listener()
{ {
@Override @Override
public void nodeAdded(DiscoveryDruidNode node) public void nodesAdded(List<DiscoveryDruidNode> nodes)
{ {
lookupNodes.add(node); lookupNodes.addAll(nodes);
} }
@Override @Override
public void nodeRemoved(DiscoveryDruidNode node) public void nodesRemoved(List<DiscoveryDruidNode> nodes)
{ {
lookupNodes.remove(node); lookupNodes.removeAll(nodes);
} }
} }
); );
@ -204,14 +205,14 @@ public class DruidNodeDiscoveryProviderTest
void add(DiscoveryDruidNode node) void add(DiscoveryDruidNode node)
{ {
for (DruidNodeDiscovery.Listener listener : listeners) { for (DruidNodeDiscovery.Listener listener : listeners) {
listener.nodeAdded(node); listener.nodesAdded(ImmutableList.of(node));
} }
} }
void remove(DiscoveryDruidNode node) void remove(DiscoveryDruidNode node)
{ {
for (DruidNodeDiscovery.Listener listener : listeners) { for (DruidNodeDiscovery.Listener listener : listeners) {
listener.nodeRemoved(node); listener.nodesRemoved(ImmutableList.of(node));
} }
} }
} }

View File

@ -22,6 +22,7 @@ package io.druid.server.http;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteSource; import com.google.common.io.ByteSource;
import com.google.common.net.HostAndPort; import com.google.common.net.HostAndPort;
import io.druid.audit.AuditInfo; import io.druid.audit.AuditInfo;
@ -42,8 +43,8 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class LookupCoordinatorResourceTest public class LookupCoordinatorResourceTest
{ {
@ -147,7 +148,7 @@ public class LookupCoordinatorResourceTest
@Test @Test
public void testDiscoveryGet() public void testDiscoveryGet()
{ {
final List<String> tiers = ImmutableList.of(); final Set<String> tiers = ImmutableSet.of();
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock( final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
LookupCoordinatorManager.class); LookupCoordinatorManager.class);
EasyMock.expect(lookupCoordinatorManager.discoverTiers()).andReturn(tiers).once(); EasyMock.expect(lookupCoordinatorManager.discoverTiers()).andReturn(tiers).once();

View File

@ -1,153 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.listener.announcer;
import com.google.common.collect.ImmutableSet;
import io.druid.concurrent.Execs;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
import io.druid.segment.CloserRule;
import io.druid.server.http.HostAndPortWithScheme;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.utils.ZKPaths;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
public class ListenerDiscovererTest extends CuratorTestBase
{
@Rule
public CloserRule closerRule = new CloserRule(true);
@Test(timeout = 60_000L)
public void testFullService() throws Exception
{
final String listenerKey = "listenerKey";
final String listenerTier = "listenerTier";
final String listenerTierChild = "tierChild";
final String tierZkPath = ZKPaths.makePath(listenerTier, listenerTierChild);
setupServerAndCurator();
final ExecutorService executorService = Execs.singleThreaded("listenerDiscovererTest--%s");
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
executorService.shutdownNow();
}
});
closerRule.closeLater(server);
closerRule.closeLater(curator);
curator.start();
curator.blockUntilConnected(10, TimeUnit.SECONDS);
Assert.assertEquals("/druid", curator.create().forPath("/druid"));
final Announcer announcer = new Announcer(curator, executorService);
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
announcer.stop();
}
});
final ListeningAnnouncerConfig config = new ListeningAnnouncerConfig(new ZkPathsConfig());
final ListenerDiscoverer listenerDiscoverer = new ListenerDiscoverer(curator, config);
listenerDiscoverer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
listenerDiscoverer.stop();
}
});
Assert.assertTrue(listenerDiscoverer.getNodes(listenerKey).isEmpty());
final HostAndPortWithScheme node = HostAndPortWithScheme.fromParts("http", "someHost", 8888);
final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
config,
listenerKey,
node
)
{
};
listenerResourceAnnouncer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
listenerResourceAnnouncer.stop();
}
});
final ListenerResourceAnnouncer tieredListenerResourceAnnouncer = new ListenerResourceAnnouncer(
announcer,
config,
tierZkPath,
node
)
{
};
tieredListenerResourceAnnouncer.start();
closerRule.closeLater(new Closeable()
{
@Override
public void close() throws IOException
{
tieredListenerResourceAnnouncer.stop();
}
});
announcer.start();
Assert.assertNotNull(curator.checkExists().forPath(config.getAnnouncementPath(listenerKey)));
// Have to wait for background syncing
while (listenerDiscoverer.getNodes(listenerKey).isEmpty()) {
// Will timeout at test's timeout setting
Thread.sleep(1);
}
Assert.assertEquals(
ImmutableSet.of(HostAndPortWithScheme.fromString(node.toString())),
listenerDiscoverer.getNodes(listenerKey)
);
// 2nd call of two concurrent getNewNodes should return no entry collection
listenerDiscoverer.getNewNodes(listenerKey);
Assert.assertEquals(
0,
listenerDiscoverer.getNewNodes(listenerKey).size()
);
Assert.assertEquals(
ImmutableSet.of(listenerKey, listenerTier),
ImmutableSet.copyOf(listenerDiscoverer.discoverChildren(null))
);
Assert.assertEquals(
ImmutableSet.of(listenerTierChild),
ImmutableSet.copyOf(listenerDiscoverer.discoverChildren(listenerTier))
);
}
}

View File

@ -36,17 +36,14 @@ import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.SequenceInputStreamResponseHandler; import com.metamx.http.client.response.SequenceInputStreamResponseHandler;
import io.druid.audit.AuditInfo; import io.druid.audit.AuditInfo;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.java.util.common.StringUtils; import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.query.lookup.LookupModule; import io.druid.java.util.common.StringUtils;
import io.druid.query.lookup.LookupsState; import io.druid.query.lookup.LookupsState;
import io.druid.server.http.HostAndPortWithScheme; import io.druid.server.http.HostAndPortWithScheme;
import io.druid.server.listener.announcer.ListenerDiscoverer;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -63,6 +60,7 @@ import java.io.InputStream;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -73,7 +71,9 @@ public class LookupCoordinatorManagerTest
@Rule @Rule
public ExpectedException expectedException = ExpectedException.none(); public ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper mapper = new DefaultObjectMapper(); private final ObjectMapper mapper = new DefaultObjectMapper();
private final ListenerDiscoverer discoverer = EasyMock.createStrictMock(ListenerDiscoverer.class); private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class);
private final LookupNodeDiscovery lookupNodeDiscovery = EasyMock.createStrictMock(
LookupNodeDiscovery.class);
private final HttpClient client = EasyMock.createStrictMock(HttpClient.class); private final HttpClient client = EasyMock.createStrictMock(HttpClient.class);
private final JacksonConfigManager configManager = EasyMock.createStrictMock(JacksonConfigManager.class); private final JacksonConfigManager configManager = EasyMock.createStrictMock(JacksonConfigManager.class);
private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig = new LookupCoordinatorManagerConfig(); private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig = new LookupCoordinatorManagerConfig();
@ -139,6 +139,8 @@ public class LookupCoordinatorManagerTest
SERVICE_EMITTER.flush(); SERVICE_EMITTER.flush();
EVENT_EMITS.set(0L); EVENT_EMITS.set(0L);
EasyMock.reset(lookupNodeDiscovery);
EasyMock.reset(configManager); EasyMock.reset(configManager);
EasyMock.expect( EasyMock.expect(
configManager.watch( configManager.watch(
@ -532,7 +534,7 @@ public class LookupCoordinatorManagerTest
{ {
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -555,7 +557,7 @@ public class LookupCoordinatorManagerTest
{ {
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -587,7 +589,7 @@ public class LookupCoordinatorManagerTest
{ {
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -624,7 +626,7 @@ public class LookupCoordinatorManagerTest
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost"); final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -686,7 +688,7 @@ public class LookupCoordinatorManagerTest
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost"); final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -741,7 +743,7 @@ public class LookupCoordinatorManagerTest
{ {
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -771,7 +773,7 @@ public class LookupCoordinatorManagerTest
{ {
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -805,7 +807,7 @@ public class LookupCoordinatorManagerTest
final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost"); final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost");
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -860,7 +862,7 @@ public class LookupCoordinatorManagerTest
); );
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -905,7 +907,7 @@ public class LookupCoordinatorManagerTest
); );
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -930,7 +932,7 @@ public class LookupCoordinatorManagerTest
{ {
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -956,7 +958,7 @@ public class LookupCoordinatorManagerTest
); );
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -985,7 +987,7 @@ public class LookupCoordinatorManagerTest
); );
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -1010,7 +1012,7 @@ public class LookupCoordinatorManagerTest
{ {
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -1052,11 +1054,11 @@ public class LookupCoordinatorManagerTest
HostAndPortWithScheme host1 = HostAndPortWithScheme.fromParts("http", "host1", 1234); HostAndPortWithScheme host1 = HostAndPortWithScheme.fromParts("http", "host1", 1234);
HostAndPortWithScheme host2 = HostAndPortWithScheme.fromParts("http", "host2", 3456); HostAndPortWithScheme host2 = HostAndPortWithScheme.fromParts("http", "host2", 3456);
EasyMock.reset(discoverer); EasyMock.reset(lookupNodeDiscovery);
EasyMock.expect( EasyMock.expect(
discoverer.getNodes(LookupModule.getTierListenerPath("tier1")) lookupNodeDiscovery.getNodesInTier("tier1")
).andReturn(ImmutableList.of(host1, host2)).anyTimes(); ).andReturn(ImmutableList.of(host1, host2)).anyTimes();
EasyMock.replay(discoverer); EasyMock.replay(lookupNodeDiscovery);
LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator = EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class); LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator = EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class);
EasyMock.expect( EasyMock.expect(
@ -1134,10 +1136,11 @@ public class LookupCoordinatorManagerTest
}; };
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
discoverer, druidNodeDiscoveryProvider,
configManager, configManager,
lookupCoordinatorManagerConfig, lookupCoordinatorManagerConfig,
lookupsCommunicator lookupsCommunicator,
lookupNodeDiscovery
); );
Assert.assertTrue(manager.knownOldState.get().isEmpty()); Assert.assertTrue(manager.knownOldState.get().isEmpty());
@ -1155,7 +1158,7 @@ public class LookupCoordinatorManagerTest
Thread.sleep(100); Thread.sleep(100);
} }
EasyMock.verify(discoverer, configManager, lookupsCommunicator); EasyMock.verify(lookupNodeDiscovery, configManager, lookupsCommunicator);
} }
@Test @Test
@ -1163,7 +1166,7 @@ public class LookupCoordinatorManagerTest
{ {
LookupCoordinatorManager manager = new LookupCoordinatorManager( LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -1199,7 +1202,7 @@ public class LookupCoordinatorManagerTest
{ {
LookupCoordinatorManager manager = new LookupCoordinatorManager( LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -1246,7 +1249,7 @@ public class LookupCoordinatorManagerTest
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -1283,7 +1286,7 @@ public class LookupCoordinatorManagerTest
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig
@ -1327,60 +1330,56 @@ public class LookupCoordinatorManagerTest
@Test @Test
public void testLookupDiscoverAll() throws Exception public void testLookupDiscoverAll() throws Exception
{ {
final List<String> fakeChildren = ImmutableList.of("tier1", "tier2"); final Set<String> fakeChildren = ImmutableSet.of("tier1", "tier2");
EasyMock.reset(discoverer); EasyMock.reset(lookupNodeDiscovery);
EasyMock.expect(discoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)) EasyMock.expect(lookupNodeDiscovery.getAllTiers())
.andReturn(fakeChildren) .andReturn(fakeChildren)
.once(); .once();
EasyMock.replay(discoverer); EasyMock.replay(lookupNodeDiscovery);
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, druidNodeDiscoveryProvider,
discoverer,
mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig,
EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class),
lookupNodeDiscovery
); );
manager.start(); manager.start();
Assert.assertEquals(fakeChildren, manager.discoverTiers()); Assert.assertEquals(fakeChildren, manager.discoverTiers());
EasyMock.verify(discoverer); EasyMock.verify(lookupNodeDiscovery);
} }
@Test @Test
public void testLookupDiscoverAllExceptional() throws Exception public void testDiscoverNodesInTier() throws Exception
{ {
final IOException ex = new IOException("some exception"); EasyMock.reset(lookupNodeDiscovery);
EasyMock.reset(discoverer); EasyMock.expect(lookupNodeDiscovery.getNodesInTier("tier"))
EasyMock.expect(discoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)) .andReturn(
.andThrow(ex) ImmutableSet.of(
HostAndPortWithScheme.fromParts("http", "h1", 8080),
HostAndPortWithScheme.fromParts("http", "h2", 8080)
)
)
.once(); .once();
expectedException.expectCause( EasyMock.replay(lookupNodeDiscovery);
new BaseMatcher<Throwable>()
{
@Override
public boolean matches(Object o)
{
return o == ex;
}
@Override
public void describeTo(Description description)
{
}
}
);
EasyMock.replay(discoverer);
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, druidNodeDiscoveryProvider,
discoverer,
mapper,
configManager, configManager,
lookupCoordinatorManagerConfig lookupCoordinatorManagerConfig,
EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class),
lookupNodeDiscovery
); );
manager.start(); manager.start();
manager.discoverTiers(); Assert.assertEquals(
EasyMock.verify(discoverer); ImmutableSet.of(
HostAndPort.fromParts("h1", 8080),
HostAndPort.fromParts("h2", 8080)
),
ImmutableSet.copyOf(manager.discoverNodesInTier("tier")));
EasyMock.verify(lookupNodeDiscovery);
} }
//tests that lookups stored in db from 0.10.0 are converted and restored. //tests that lookups stored in db from 0.10.0 are converted and restored.
@ -1434,7 +1433,7 @@ public class LookupCoordinatorManagerTest
final LookupCoordinatorManager manager = new LookupCoordinatorManager( final LookupCoordinatorManager manager = new LookupCoordinatorManager(
client, client,
discoverer, druidNodeDiscoveryProvider,
mapper, mapper,
configManager, configManager,
new LookupCoordinatorManagerConfig() new LookupCoordinatorManagerConfig()

View File

@ -0,0 +1,121 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.lookup.cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.server.DruidNode;
import io.druid.server.http.HostAndPortWithScheme;
import io.druid.server.initialization.ServerConfig;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
*/
public class LookupNodeDiscoveryTest
{
private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private DruidNodeDiscovery druidNodeDiscovery;
private LookupNodeDiscovery lookupNodeDiscovery;
@Before
public void setup()
{
druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class);
druidNodeDiscovery = EasyMock.createStrictMock(DruidNodeDiscovery.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForService(LookupNodeService.DISCOVERY_SERVICE_KEY))
.andReturn(druidNodeDiscovery);
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1"))
);
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1"))
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier2"))
);
EasyMock.expect(druidNodeDiscovery.getAllNodes())
.andReturn(ImmutableSet.of(node1, node2, node3))
.anyTimes();;
EasyMock.replay(druidNodeDiscoveryProvider, druidNodeDiscovery);
lookupNodeDiscovery = new LookupNodeDiscovery(druidNodeDiscoveryProvider);
}
@Test
public void testGetNodesInTier() throws Exception
{
Assert.assertEquals(
ImmutableList.of(
HostAndPortWithScheme.fromParts("http", "h1", 8080),
HostAndPortWithScheme.fromParts("http", "h2", 8080)
),
ImmutableList.copyOf(lookupNodeDiscovery.getNodesInTier("tier1"))
);
Assert.assertEquals(
ImmutableList.of(
HostAndPortWithScheme.fromParts("http", "h3", 8080)
),
ImmutableList.copyOf(lookupNodeDiscovery.getNodesInTier("tier2"))
);
Assert.assertEquals(
ImmutableList.of(),
ImmutableList.copyOf(lookupNodeDiscovery.getNodesInTier("tier3"))
);
EasyMock.verify(druidNodeDiscoveryProvider, druidNodeDiscovery);
}
@Test
public void testGetAllTiers() throws Exception
{
Assert.assertEquals(
ImmutableSet.of("tier1", "tier2"),
lookupNodeDiscovery.getAllTiers()
);
EasyMock.verify(druidNodeDiscoveryProvider, druidNodeDiscovery);
}
}

View File

@ -19,10 +19,7 @@
package io.druid.server.router; package io.druid.server.router;
import com.google.common.collect.ImmutableMap;
import io.druid.client.DruidServer;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.query.Query; import io.druid.query.Query;
@ -37,44 +34,19 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashMap;
/** /**
*/ */
public class QueryHostFinderTest public class QueryHostFinderTest
{ {
private ServerDiscoverySelector selector;
private TieredBrokerHostSelector brokerSelector; private TieredBrokerHostSelector brokerSelector;
private TieredBrokerConfig config;
private Server server; private Server server;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
selector = EasyMock.createMock(ServerDiscoverySelector.class);
brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class); brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class);
config = new TieredBrokerConfig()
{
@Override
public LinkedHashMap<String, String> getTierToBrokerMap()
{
return new LinkedHashMap<>(
ImmutableMap.<String, String>of(
"hot", "hotBroker",
"medium", "mediumBroker",
DruidServer.DEFAULT_TIER, "coldBroker"
)
);
}
@Override
public String getDefaultBrokerServiceName()
{
return "hotBroker";
}
};
server = new Server() server = new Server()
{ {
@Override @Override
@ -101,24 +73,22 @@ public class QueryHostFinderTest
return 0; return 0;
} }
}; };
EasyMock.expect(brokerSelector.select(EasyMock.anyObject(Query.class))).andReturn(
Pair.of("service", server)
);
EasyMock.replay(brokerSelector);
} }
@After @After
public void tearDown() throws Exception public void tearDown() throws Exception
{ {
EasyMock.verify(brokerSelector); EasyMock.verify(brokerSelector);
EasyMock.verify(selector);
} }
@Test @Test
public void testFindServer() throws Exception public void testFindServer() throws Exception
{ {
EasyMock.expect(brokerSelector.select(EasyMock.<Query>anyObject())).andReturn(new Pair("hotBroker", selector));
EasyMock.replay(brokerSelector);
EasyMock.expect(selector.pick()).andReturn(server).once();
EasyMock.replay(selector);
QueryHostFinder queryRunner = new QueryHostFinder( QueryHostFinder queryRunner = new QueryHostFinder(
brokerSelector brokerSelector
); );

View File

@ -20,21 +20,32 @@
package io.druid.server.router; package io.druid.server.router;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.curator.discovery.ServerDiscoveryFactory; import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Json;
import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.query.Druids; import io.druid.query.Druids;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ServerConfig;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.After; import org.junit.After;
@ -42,7 +53,9 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -50,18 +63,59 @@ import java.util.List;
*/ */
public class TieredBrokerHostSelectorTest public class TieredBrokerHostSelectorTest
{ {
private ServerDiscoveryFactory factory; private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private ServerDiscoverySelector selector; private DruidNodeDiscovery druidNodeDiscovery;
private TieredBrokerHostSelector brokerSelector; private TieredBrokerHostSelector brokerSelector;
private DiscoveryDruidNode node1;
private DiscoveryDruidNode node2;
private DiscoveryDruidNode node3;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
{ {
factory = EasyMock.createMock(ServerDiscoveryFactory.class); druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class);
selector = EasyMock.createMock(ServerDiscoverySelector.class);
node1 = new DiscoveryDruidNode(
new DruidNode("hotBroker", "hotHost", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableMap.of()
);
node2 = new DiscoveryDruidNode(
new DruidNode("coldBroker", "coldHost1", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableMap.of()
);
node3 = new DiscoveryDruidNode(
new DruidNode("coldBroker", "coldHost2", 8080, null, new ServerConfig()),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableMap.of()
);
druidNodeDiscovery = new DruidNodeDiscovery()
{
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
return ImmutableSet.of(node1, node2, node3);
}
@Override
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of(node1, node2, node3));
}
};
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER))
.andReturn(druidNodeDiscovery);;
EasyMock.replay(druidNodeDiscoveryProvider);
brokerSelector = new TieredBrokerHostSelector( brokerSelector = new TieredBrokerHostSelector(
new TestRuleManager(null, null, null, null), new TestRuleManager(null, null, null),
new TieredBrokerConfig() new TieredBrokerConfig()
{ {
@Override @Override
@ -82,20 +136,11 @@ public class TieredBrokerHostSelectorTest
return "hotBroker"; return "hotBroker";
} }
}, },
factory, druidNodeDiscoveryProvider,
Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1)) Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1))
); );
EasyMock.expect(factory.createSelector(EasyMock.<String>anyObject())).andReturn(selector).atLeastOnce();
EasyMock.replay(factory);
selector.start();
EasyMock.expectLastCall().atLeastOnce();
selector.stop();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(selector);
brokerSelector.start(); brokerSelector.start();
} }
@After @After
@ -103,39 +148,47 @@ public class TieredBrokerHostSelectorTest
{ {
brokerSelector.stop(); brokerSelector.stop();
EasyMock.verify(selector); EasyMock.verify(druidNodeDiscoveryProvider);
EasyMock.verify(factory);
} }
@Test @Test
public void testBasicSelect() throws Exception public void testBasicSelect() throws Exception
{ {
String brokerName = (String) brokerSelector.select( TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
Druids.newTimeseriesQueryBuilder()
.dataSource("test") .dataSource("test")
.granularity("all") .granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows"))) .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(Intervals.of("2011-08-31/2011-09-01"))) .intervals(Arrays.<Interval>asList(Intervals.of("2011-08-31/2011-09-01")))
.build() .build();
).lhs;
Assert.assertEquals("coldBroker", brokerName); Pair<String, Server> p = brokerSelector.select(query);
Assert.assertEquals("coldBroker", p.lhs);
Assert.assertEquals("coldHost1:8080", p.rhs.getHost());
p = brokerSelector.select(query);
Assert.assertEquals("coldBroker", p.lhs);
Assert.assertEquals("coldHost2:8080", p.rhs.getHost());
p = brokerSelector.select(query);
Assert.assertEquals("coldBroker", p.lhs);
Assert.assertEquals("coldHost1:8080", p.rhs.getHost());
} }
@Test @Test
public void testBasicSelect2() throws Exception public void testBasicSelect2() throws Exception
{ {
String brokerName = (String) brokerSelector.select( Pair<String, Server> p = brokerSelector.select(
Druids.newTimeseriesQueryBuilder() Druids.newTimeseriesQueryBuilder()
.dataSource("test") .dataSource("test")
.granularity("all") .granularity("all")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows"))) .aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.intervals(Arrays.<Interval>asList(Intervals.of("2013-08-31/2013-09-01"))) .intervals(Arrays.<Interval>asList(Intervals.of("2013-08-31/2013-09-01")))
.build() .build()
).lhs; );
Assert.assertEquals("hotBroker", brokerName); Assert.assertEquals("hotBroker", p.lhs);
Assert.assertEquals("hotHost:8080", p.rhs.getHost());
} }
@Test @Test
@ -241,16 +294,38 @@ public class TieredBrokerHostSelectorTest
Assert.assertEquals("hotBroker", brokerName); Assert.assertEquals("hotBroker", brokerName);
} }
@Test
public void testGetAllBrokers()
{
Assert.assertEquals(
ImmutableMap.of(
"mediumBroker", ImmutableList.of(),
"coldBroker", ImmutableList.of("coldHost1:8080", "coldHost2:8080"),
"hotBroker", ImmutableList.of("hotHost:8080")
),
Maps.transformValues(
brokerSelector.getAllBrokers(),
new Function<List<Server>, List<String>>()
{
@Override
public List<String> apply(@Nullable List<Server> servers)
{
return Lists.transform(servers, server -> server.getHost());
}
}
)
);
}
private static class TestRuleManager extends CoordinatorRuleManager private static class TestRuleManager extends CoordinatorRuleManager
{ {
public TestRuleManager( public TestRuleManager(
@Global HttpClient httpClient, @Global HttpClient httpClient,
@Json ObjectMapper jsonMapper, @Json ObjectMapper jsonMapper,
Supplier<TieredBrokerConfig> config, Supplier<TieredBrokerConfig> config
ServerDiscoverySelector selector
) )
{ {
super(httpClient, jsonMapper, config, selector); super(httpClient, jsonMapper, config, null);
} }
@Override @Override

View File

@ -21,7 +21,6 @@ package io.druid.cli;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.name.Names; import com.google.inject.name.Names;
import io.airlift.airline.Command; import io.airlift.airline.Command;
@ -34,8 +33,6 @@ import io.druid.client.cache.CacheMonitor;
import io.druid.client.selector.CustomTierSelectorStrategyConfig; import io.druid.client.selector.CustomTierSelectorStrategyConfig;
import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.ServerSelectorStrategy;
import io.druid.client.selector.TierSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.guice.CacheModule; import io.druid.guice.CacheModule;
import io.druid.guice.DruidProcessingModule; import io.druid.guice.DruidProcessingModule;
import io.druid.guice.Jerseys; import io.druid.guice.Jerseys;
@ -123,14 +120,6 @@ public class CliBroker extends ServerRunnable
MetricsModule.register(binder, CacheMonitor.class); MetricsModule.register(binder, CacheMonitor.class);
LifecycleModule.register(binder, Server.class); LifecycleModule.register(binder, Server.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableList.of(LookupNodeService.class)
)
).in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
} }
}, },
new LookupModule(), new LookupModule(),

View File

@ -75,7 +75,6 @@ import io.druid.server.http.RulesResource;
import io.druid.server.http.ServersResource; import io.druid.server.http.ServersResource;
import io.druid.server.http.TiersResource; import io.druid.server.http.TiersResource;
import io.druid.server.initialization.jetty.JettyServerInitializer; import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.listener.announcer.ListenerDiscoverer;
import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.lookup.cache.LookupCoordinatorManagerConfig; import io.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
import io.druid.server.router.TieredBrokerConfig; import io.druid.server.router.TieredBrokerConfig;
@ -168,9 +167,6 @@ public class CliCoordinator extends ServerRunnable
binder.bind(LookupCoordinatorManager.class).in(LazySingleton.class); binder.bind(LookupCoordinatorManager.class).in(LazySingleton.class);
binder.bind(DruidCoordinator.class); binder.bind(DruidCoordinator.class);
binder.bind(ListenerDiscoverer.class).in(ManageLifecycle.class);
LifecycleModule.register(binder, ListenerDiscoverer.class);
LifecycleModule.register(binder, MetadataStorage.class); LifecycleModule.register(binder, MetadataStorage.class);
LifecycleModule.register(binder, DruidCoordinator.class); LifecycleModule.register(binder, DruidCoordinator.class);