diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java index 201276ed473..71adf3493ad 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -68,6 +68,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -104,10 +105,10 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private volatile ExecutorService executor; - // a queue of queryable server names for which worker threads in executor initiate the segment list call i.e. - // DruidServerHolder.updateSegmentsListAsync(..) which updates the segment list asynchronously and adds itself - // to this queue again for next update. - private final BlockingQueue queue = new LinkedBlockingDeque<>(); + // the work queue, all items in this are sequentially processed by main thread setup in start() + // used to call inventoryInitialized on all SegmentCallbacks and + // for keeping segment list for each queryable server uptodate. + private final BlockingQueue queue = new LinkedBlockingDeque<>(); private final HttpClient httpClient; private final ObjectMapper smileMapper; @@ -161,10 +162,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { try { - DruidServerHolder holder = servers.get(queue.take()); - if (holder != null) { - holder.updateSegmentsListAsync(); - } + queue.take().run(); } catch (InterruptedException ex) { log.info("main thread interrupted, served segments list is not synced anymore."); @@ -184,17 +182,27 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer druidNodeDiscovery.registerListener( new DruidNodeDiscovery.Listener() { + private volatile boolean initialized = false; @Override - public void nodeAdded(DiscoveryDruidNode node) + public void nodesAdded(List nodes) { - serverAddedOrUpdated(toDruidServer(node)); + nodes.forEach( + node -> serverAddedOrUpdated(toDruidServer(node)) + ); + + if (!initialized) { + initialized = true; + queue.add(HttpServerInventoryView.this::serverInventoryInitialized); + } } @Override - public void nodeRemoved(DiscoveryDruidNode node) + public void nodesRemoved(List nodes) { - serverRemoved(toDruidServer(node)); + nodes.forEach( + node -> serverRemoved(toDruidServer(node)) + ); } private DruidServer toDruidServer(DiscoveryDruidNode node) @@ -572,7 +580,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer log.error(ex, "error processing segment list response from server [%s]", druidServer.getName()); } finally { - queue.add(druidServer.getName()); + addNextSyncToWorkQueue(druidServer.getName()); } } @@ -611,7 +619,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } } finally { - queue.add(druidServer.getName()); + addNextSyncToWorkQueue(druidServer.getName()); } } }, @@ -621,7 +629,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer return future; } catch (Throwable th) { - queue.add(druidServer.getName()); + addNextSyncToWorkQueue(druidServer.getName()); String logMsg = StringUtils.nonStrictFormat( "Fatal error while fetching segment list from server [%s].", druidServer.getName() @@ -646,6 +654,18 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } } + private void addNextSyncToWorkQueue(final String serverId) + { + queue.add( + () -> { + DruidServerHolder holder = servers.get(serverId); + if (holder != null) { + holder.updateSegmentsListAsync(); + } + } + ); + } + private boolean hasUnstabilityTimeoutPassed() { if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) { diff --git a/server/src/main/java/io/druid/client/selector/HostSelector.java b/server/src/main/java/io/druid/client/selector/HostSelector.java deleted file mode 100644 index 3afafac4aa8..00000000000 --- a/server/src/main/java/io/druid/client/selector/HostSelector.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.client.selector; - -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.java.util.common.Pair; -import io.druid.query.Query; - -/** - */ -public interface HostSelector -{ - public String getDefaultServiceName(); - - public Pair select(Query query); -} diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java index a3eaee13323..b4e4827760a 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java @@ -22,6 +22,7 @@ package io.druid.curator.discovery; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import io.druid.concurrent.Execs; import io.druid.concurrent.LifecycleLock; @@ -167,6 +168,8 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide private final Object lock = new Object(); + private boolean cacheInitialized = false; + NodeTypeWatcher( ExecutorService listenerExecutor, CuratorFramework curatorFramework, @@ -201,20 +204,14 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide public void registerListener(DruidNodeDiscovery.Listener listener) { synchronized (lock) { - for (DiscoveryDruidNode node : nodes.values()) { - listenerExecutor.submit(() -> { - try { - listener.nodeAdded(node); - } - catch (Exception ex) { - log.error( - ex, - "Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].", - node, - listener - ); - } - }); + if (cacheInitialized) { + ImmutableList currNodes = ImmutableList.copyOf(nodes.values()); + safeSchedule( + () -> { + listener.nodesAdded(currNodes); + }, + "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener + ); } nodeListeners.add(listener); } @@ -280,8 +277,30 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide break; } + case INITIALIZED: { + if (cacheInitialized) { + log.warn("cache is already initialized. ignoring [%s] event, nodeType [%s].", event.getType(), nodeType); + return; + } + + log.info("Received INITIALIZED in node watcher for type [%s].", nodeType); + + cacheInitialized = true; + + ImmutableList currNodes = ImmutableList.copyOf(nodes.values()); + for (Listener l : nodeListeners) { + safeSchedule( + () -> { + l.nodesAdded(currNodes); + }, + "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, l + ); + } + + break; + } default: { - log.error("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType); + log.info("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), nodeType); } } } @@ -291,56 +310,59 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide } } + private void safeSchedule( + Runnable runnable, + String errMsgFormat, Object... args + ) + { + listenerExecutor.submit(() -> { + try { + runnable.run(); + } + catch (Exception ex) { + log.error(errMsgFormat, args); + } + }); + } + private void addNode(DiscoveryDruidNode druidNode) { - synchronized (lock) { - DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode); - if (prev == null) { - for (DruidNodeDiscovery.Listener l : nodeListeners) { - listenerExecutor.submit(() -> { - try { - l.nodeAdded(druidNode); - } - catch (Exception ex) { - log.error( - ex, - "Exception occured in DiscoveryDruidNode.nodeAdded(node=[%s]) in listener [%s].", - druidNode, - l - ); - } - }); + DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode); + if (prev == null) { + if (cacheInitialized) { + List newNode = ImmutableList.of(druidNode); + for (Listener l : nodeListeners) { + safeSchedule( + () -> { + l.nodesAdded(newNode); + }, + "Exception occured in nodeAdded(node=[%s]) in listener [%s].", druidNode, l + ); } - } else { - log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev); } + } else { + log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev); } } private void removeNode(DiscoveryDruidNode druidNode) { - synchronized (lock) { - DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse()); + DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse()); - if (prev == null) { - log.warn("Noticed disappearance of unknown druid node [%s:%s].", druidNode.getNodeType(), druidNode); - return; - } + if (prev == null) { + log.warn("Noticed disappearance of unknown druid node [%s:%s].", druidNode.getNodeType(), druidNode); + return; + } - for (DruidNodeDiscovery.Listener l : nodeListeners) { - listenerExecutor.submit(() -> { - try { - l.nodeRemoved(druidNode); - } - catch (Exception ex) { - log.error( - ex, - "Exception occured in DiscoveryDruidNode.nodeRemoved(node=[%s]) in listener [%s].", - druidNode, - l - ); - } - }); + if (cacheInitialized) { + List nodeRemoved = ImmutableList.of(druidNode); + for (Listener l : nodeListeners) { + safeSchedule( + () -> { + l.nodesRemoved(nodeRemoved); + }, + "Exception occured in nodeRemoved(node=[%s]) in listener [%s].", druidNode, l + ); } } } @@ -351,7 +373,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide cache.getListenable().addListener( (client, event) -> handleChildEvent(client, event) ); - cache.start(); + cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } catch (Exception ex) { throw Throwables.propagate(ex); diff --git a/server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java b/server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java index 7b051ccbf00..75753695517 100644 --- a/server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java +++ b/server/src/main/java/io/druid/discovery/DruidNodeDiscovery.java @@ -20,6 +20,7 @@ package io.druid.discovery; import java.util.Collection; +import java.util.List; /** * Interface for discovering Druid Nodes announced by DruidNodeAnnouncer. @@ -29,9 +30,23 @@ public interface DruidNodeDiscovery Collection getAllNodes(); void registerListener(Listener listener); + /** + * Listener for watching nodes in a DruidNodeDiscovery instance obtained via DruidNodeDiscoveryProvider.getXXX(). + * DruidNodeDiscovery implementation should assume that Listener is not threadsafe and never call methods in + * Listener concurrently. + * + * Implementation of Listener must ensure to not do any time consuming work or block in any of the methods. + */ interface Listener { - void nodeAdded(DiscoveryDruidNode node); - void nodeRemoved(DiscoveryDruidNode node); + /** + * List of nodes added. + * First call to this method is also a signal that underlying cache in the DruidNodeDiscovery implementation + * has been initialized. + * @param nodes + */ + void nodesAdded(List nodes); + + void nodesRemoved(List nodes); } } diff --git a/server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java index 03584095836..0f86bfc391f 100644 --- a/server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java +++ b/server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java @@ -19,6 +19,7 @@ package io.druid.discovery; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.druid.java.util.common.IAE; @@ -27,6 +28,7 @@ import io.druid.java.util.common.logger.Logger; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -63,7 +65,8 @@ public abstract class DruidNodeDiscoveryProvider WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM) ); - private final Map serviceDiscoveryMap = new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size()); + private final ConcurrentHashMap serviceDiscoveryMap = new ConcurrentHashMap<>( + SERVICE_TO_NODE_TYPES.size()); /** * Get DruidNodeDiscovery instance to discover nodes of given nodeType. @@ -73,69 +76,47 @@ public abstract class DruidNodeDiscoveryProvider /** * Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata. */ - public synchronized DruidNodeDiscovery getForService(String serviceName) + public DruidNodeDiscovery getForService(String serviceName) { - ServiceListener nodeDiscovery = serviceDiscoveryMap.get(serviceName); + return serviceDiscoveryMap.compute( + serviceName, + (k, v) -> { + if (v != null) { + return v; + } - if (nodeDiscovery == null) { - Set nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName); - if (nodeTypesToWatch == null) { - throw new IAE("Unknown service [%s].", serviceName); - } + Set nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName); + if (nodeTypesToWatch == null) { + throw new IAE("Unknown service [%s].", serviceName); + } - nodeDiscovery = new ServiceListener(serviceName); - for (String nodeType : nodeTypesToWatch) { - getForNodeType(nodeType).registerListener(nodeDiscovery); - } - serviceDiscoveryMap.put(serviceName, nodeDiscovery); - } - - return nodeDiscovery; + ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(serviceName); + for (String nodeType : nodeTypesToWatch) { + getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener()); + } + return serviceDiscovery; + } + ); } - private static class ServiceListener implements DruidNodeDiscovery, DruidNodeDiscovery.Listener + private static class ServiceDruidNodeDiscovery implements DruidNodeDiscovery { + private static final Logger log = new Logger(ServiceDruidNodeDiscovery.class); + private final String service; private final Map nodes = new ConcurrentHashMap<>(); private final List listeners = new ArrayList<>(); - ServiceListener(String service) + private final Object lock = new Object(); + + private Set uninitializedNodeTypeListeners = new HashSet<>(); + + ServiceDruidNodeDiscovery(String service) { this.service = service; } - @Override - public synchronized void nodeAdded(DiscoveryDruidNode node) - { - if (node.getServices().containsKey(service)) { - DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node); - - if (prev == null) { - for (Listener listener : listeners) { - listener.nodeAdded(node); - } - } else { - log.warn("Node[%s] discovered but already exists [%s].", node, prev); - } - } else { - log.warn("Node[%s] discovered but doesn't have service[%s]. Ignored.", node, service); - } - } - - @Override - public synchronized void nodeRemoved(DiscoveryDruidNode node) - { - DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse()); - if (prev != null) { - for (Listener listener : listeners) { - listener.nodeRemoved(node); - } - } else { - log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service); - } - } - @Override public Collection getAllNodes() { @@ -143,12 +124,91 @@ public abstract class DruidNodeDiscoveryProvider } @Override - public synchronized void registerListener(Listener listener) + public void registerListener(Listener listener) { - for (DiscoveryDruidNode node : nodes.values()) { - listener.nodeAdded(node); + synchronized (lock) { + if (uninitializedNodeTypeListeners.isEmpty()) { + listener.nodesAdded(ImmutableList.copyOf(nodes.values())); + } + listeners.add(listener); + } + } + + NodeTypeListener nodeTypeListener() + { + NodeTypeListener nodeListener = new NodeTypeListener(); + uninitializedNodeTypeListeners.add(nodeListener); + return nodeListener; + } + + class NodeTypeListener implements DruidNodeDiscovery.Listener + { + @Override + public void nodesAdded(List nodesDiscovered) + { + synchronized (lock) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (DiscoveryDruidNode node : nodesDiscovered) { + if (node.getServices().containsKey(service)) { + DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node); + + if (prev == null) { + builder.add(node); + } else { + log.warn("Node[%s] discovered but already exists [%s].", node, prev); + } + } else { + log.warn("Node[%s] discovered but doesn't have service[%s]. Ignored.", node, service); + } + } + + ImmutableList newNodesAdded = null; + if (uninitializedNodeTypeListeners.isEmpty()) { + newNodesAdded = builder.build(); + } else if (uninitializedNodeTypeListeners.remove(this) && uninitializedNodeTypeListeners.isEmpty()) { + newNodesAdded = ImmutableList.copyOf(nodes.values()); + } + + if (newNodesAdded != null) { + for (Listener listener : listeners) { + try { + listener.nodesAdded(newNodesAdded); + } + catch (Exception ex) { + log.error(ex, "Listener[%s].nodesAdded(%s) threw exception. Ignored.", listener, newNodesAdded); + } + } + } + } + } + + @Override + public void nodesRemoved(List nodesDisappeared) + { + synchronized (lock) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (DiscoveryDruidNode node : nodesDisappeared) { + DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse()); + if (prev != null) { + builder.add(node); + } else { + log.warn("Node[%s] disappeared but was unknown for service listener [%s].", node, service); + } + } + + if (uninitializedNodeTypeListeners.isEmpty()) { + ImmutableList nodesRemoved = builder.build(); + for (Listener listener : listeners) { + try { + listener.nodesRemoved(nodesRemoved); + } + catch (Exception ex) { + log.error(ex, "Listener[%s].nodesRemoved(%s) threw exception. Ignored.", listener, nodesRemoved); + } + } + } + } } - listeners.add(listener); } } } diff --git a/server/src/main/java/io/druid/server/DruidNode.java b/server/src/main/java/io/druid/server/DruidNode.java index a67cf1015b9..4047b511cbf 100644 --- a/server/src/main/java/io/druid/server/DruidNode.java +++ b/server/src/main/java/io/druid/server/DruidNode.java @@ -213,6 +213,15 @@ public class DruidNode return null; } + public int getPortToUse() + { + if (serverConfig.isTls()) { + return getTlsPort(); + } else { + return getPlaintextPort(); + } + } + public String getHostAndPortToUse() { return getHostAndTlsPort() != null ? getHostAndTlsPort() : getHostAndPort(); diff --git a/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java b/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java index 2c7c9f84e0e..0be3020ae87 100644 --- a/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java +++ b/server/src/main/java/io/druid/server/coordination/broker/DruidBroker.java @@ -20,12 +20,16 @@ package io.druid.server.coordination.broker; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; - import io.druid.client.FilteredServerInventoryView; import io.druid.client.ServerView; import io.druid.curator.discovery.ServiceAnnouncer; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeAnnouncer; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Self; import io.druid.java.util.common.Pair; @@ -40,17 +44,28 @@ public class DruidBroker { private final DruidNode self; private final ServiceAnnouncer serviceAnnouncer; + private final DruidNodeAnnouncer druidNodeAnnouncer; + private final DiscoveryDruidNode discoveryDruidNode; + private volatile boolean started = false; @Inject public DruidBroker( final FilteredServerInventoryView serverInventoryView, final @Self DruidNode self, - final ServiceAnnouncer serviceAnnouncer - ) + final ServiceAnnouncer serviceAnnouncer, + final DruidNodeAnnouncer druidNodeAnnouncer, + final LookupNodeService lookupNodeService + ) { this.self = self; this.serviceAnnouncer = serviceAnnouncer; + this.druidNodeAnnouncer = druidNodeAnnouncer; + this.discoveryDruidNode = new DiscoveryDruidNode( + self, + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableMap.of(lookupNodeService.getName(), lookupNodeService) + ); serverInventoryView.registerSegmentCallback( MoreExecutors.sameThreadExecutor(), @@ -60,6 +75,7 @@ public class DruidBroker public ServerView.CallbackAction segmentViewInitialized() { serviceAnnouncer.announce(self); + druidNodeAnnouncer.announce(discoveryDruidNode); return ServerView.CallbackAction.UNREGISTER; } }, @@ -87,6 +103,7 @@ public class DruidBroker return; } serviceAnnouncer.unannounce(self); + druidNodeAnnouncer.unannounce(discoveryDruidNode); started = false; } } diff --git a/server/src/main/java/io/druid/server/http/RouterResource.java b/server/src/main/java/io/druid/server/http/RouterResource.java index 5e98e668ba9..9377f403836 100644 --- a/server/src/main/java/io/druid/server/http/RouterResource.java +++ b/server/src/main/java/io/druid/server/http/RouterResource.java @@ -20,7 +20,7 @@ package io.druid.server.http; import com.google.inject.Inject; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.client.selector.Server; import io.druid.server.router.TieredBrokerHostSelector; import javax.ws.rs.GET; @@ -50,12 +50,12 @@ public class RouterResource @Produces(MediaType.APPLICATION_JSON) public Map> getBrokers() { - Map brokerSelectorMap = tieredBrokerHostSelector.getAllBrokers(); + Map> brokerSelectorMap = tieredBrokerHostSelector.getAllBrokers(); Map> brokersMap = new HashMap<>(brokerSelectorMap.size()); - for (Map.Entry e : brokerSelectorMap.entrySet()) { - brokersMap.put(e.getKey(), e.getValue().getAll().stream().map(s -> s.getHost()).collect(Collectors.toList())); + for (Map.Entry> e : brokerSelectorMap.entrySet()) { + brokersMap.put(e.getKey(), e.getValue().stream().map(s -> s.getHost()).collect(Collectors.toList())); } return brokersMap; diff --git a/server/src/main/java/io/druid/server/listener/announcer/ListenerDiscoverer.java b/server/src/main/java/io/druid/server/listener/announcer/ListenerDiscoverer.java deleted file mode 100644 index 63243c14a7a..00000000000 --- a/server/src/main/java/io/druid/server/listener/announcer/ListenerDiscoverer.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.listener.announcer; - -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import com.google.common.base.Throwables; -import com.google.common.collect.Collections2; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Inject; -import io.druid.java.util.common.lifecycle.LifecycleStart; -import io.druid.java.util.common.lifecycle.LifecycleStop; -import io.druid.java.util.common.logger.Logger; -import io.druid.server.http.HostAndPortWithScheme; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.utils.ZKPaths; -import org.apache.zookeeper.KeeperException; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -public class ListenerDiscoverer -{ - private static final Logger LOG = new Logger(ListenerDiscoverer.class); - private volatile Map lastSeenMap = ImmutableMap.of(); - private final CuratorFramework cf; - private final ListeningAnnouncerConfig listeningAnnouncerConfig; - private final Object startStopSync = new Object(); - private volatile boolean started = false; - - @Inject - public ListenerDiscoverer( - CuratorFramework cf, - ListeningAnnouncerConfig listeningAnnouncerConfig - ) - { - this.cf = cf; - this.listeningAnnouncerConfig = listeningAnnouncerConfig; - } - - @LifecycleStart - public void start() - { - synchronized (startStopSync) { - if (started) { - LOG.debug("Already started"); - return; - } - started = true; - LOG.info("Started"); - } - } - - @LifecycleStop - public void stop() - { - synchronized (startStopSync) { - if (!started) { - LOG.debug("Already stopped"); - return; - } - LOG.info("Stopped"); - started = false; - } - } - - /** - * Get nodes at a particular listener. - * This method lazily adds service discovery - * - * @param listener_key The Listener's service key - * - * @return A collection of druid nodes as established by the service discovery - * - * @throws IOException if there was an error refreshing the zookeeper cache - */ - public Collection getNodes(final String listener_key) throws IOException - { - return getCurrentNodes(listener_key).keySet(); - } - - Map getCurrentNodes(final String listener_key) throws IOException - { - final HashMap retVal = new HashMap<>(); - final String zkPath = listeningAnnouncerConfig.getAnnouncementPath(listener_key); - final Collection children; - try { - children = cf.getChildren().forPath(zkPath); - } - catch (KeeperException.NoNodeException e) { - LOG.debug(e, "No path found at [%s]", zkPath); - return ImmutableMap.of(); - } - catch (Exception e) { - throw new IOException("Error getting children for " + zkPath, e); - } - for (String child : children) { - final String childPath = ZKPaths.makePath(zkPath, child); - try { - final byte[] data; - try { - data = cf.getData().decompressed().forPath(childPath); - } - catch (Exception e) { - throw new IOException("Error getting data for " + childPath, e); - } - if (data == null) { - LOG.debug("Lost data at path [%s]", childPath); - continue; - } - final HostAndPortWithScheme hostAndPortWithScheme = HostAndPortWithScheme.fromString(child); - final Long l = ByteBuffer.wrap(data).getLong(); - retVal.put(hostAndPortWithScheme, l); - } - catch (IllegalArgumentException iae) { - LOG.warn(iae, "Error parsing [%s]", childPath); - } - } - return ImmutableMap.copyOf(retVal); - } - - /** - * Get only nodes that are new since the last time getNewNodes was called (or all nodes if it has never been called) - * - * @param listener_key The listener key to look for - * - * @return A collection of nodes that are new - * - * @throws IOException If there was an error in refreshing the Zookeeper cache - */ - public synchronized Collection getNewNodes(final String listener_key) throws IOException - { - final Map priorSeenMap = lastSeenMap; - final Map currentMap = getCurrentNodes(listener_key); - final Collection retVal = Collections2.filter( - currentMap.keySet(), - new Predicate() - { - @Override - public boolean apply(HostAndPortWithScheme input) - { - final Long l = priorSeenMap.get(input); - return l == null || l < currentMap.get(input); - } - } - ); - lastSeenMap = currentMap; - return retVal; - } - - /** - * Discovers children of the listener key - * - * @param key_base The base of the listener key, or null or empty string to get all immediate children of the listener path - * - * @return A collection of the names of the children, or empty list on NoNodeException from Curator - * - * @throws IOException from Curator - * @throws RuntimeException for other exceptions from Curator. - */ - public Collection discoverChildren(@Nullable final String key_base) throws IOException - { - final String zkPath = Strings.isNullOrEmpty(key_base) - ? listeningAnnouncerConfig.getListenersPath() - : listeningAnnouncerConfig.getAnnouncementPath(key_base); - try { - return cf.getChildren().forPath(zkPath); - } - catch (KeeperException.NoNodeException | KeeperException.NoChildrenForEphemeralsException e) { - LOG.warn(e, "Path [%s] not discoverable", zkPath); - return ImmutableList.of(); - } - catch (Exception e) { - Throwables.propagateIfInstanceOf(e, IOException.class); - throw Throwables.propagate(e); - } - } -} diff --git a/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java index d0e6d363b70..53770dfa701 100644 --- a/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/io/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -47,6 +46,7 @@ import io.druid.audit.AuditInfo; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; import io.druid.concurrent.LifecycleLock; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Smile; import io.druid.java.util.common.IAE; @@ -54,10 +54,8 @@ import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StreamUtils; import io.druid.java.util.common.StringUtils; -import io.druid.query.lookup.LookupModule; import io.druid.query.lookup.LookupsState; import io.druid.server.http.HostAndPortWithScheme; -import io.druid.server.listener.announcer.ListenerDiscoverer; import io.druid.server.listener.resource.ListenerResource; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -109,7 +107,9 @@ public class LookupCoordinatorManager private static final EmittingLogger LOG = new EmittingLogger(LookupCoordinatorManager.class); - private final ListenerDiscoverer listenerDiscoverer; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private LookupNodeDiscovery lookupNodeDiscovery; + private final JacksonConfigManager configManager; private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig; private final LookupsCommunicator lookupsCommunicator; @@ -134,32 +134,35 @@ public class LookupCoordinatorManager @Inject public LookupCoordinatorManager( final @Global HttpClient httpClient, - final ListenerDiscoverer listenerDiscoverer, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, final @Smile ObjectMapper smileMapper, final JacksonConfigManager configManager, final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig ) { this( - listenerDiscoverer, + druidNodeDiscoveryProvider, configManager, lookupCoordinatorManagerConfig, - new LookupsCommunicator(httpClient, lookupCoordinatorManagerConfig, smileMapper) + new LookupsCommunicator(httpClient, lookupCoordinatorManagerConfig, smileMapper), + null ); } @VisibleForTesting LookupCoordinatorManager( - final ListenerDiscoverer listenerDiscoverer, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, final JacksonConfigManager configManager, final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig, - final LookupsCommunicator lookupsCommunicator + final LookupsCommunicator lookupsCommunicator, + final LookupNodeDiscovery lookupNodeDiscovery ) { - this.listenerDiscoverer = listenerDiscoverer; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.configManager = configManager; this.lookupCoordinatorManagerConfig = lookupCoordinatorManagerConfig; this.lookupsCommunicator = lookupsCommunicator; + this.lookupNodeDiscovery = lookupNodeDiscovery; } public boolean updateLookup( @@ -275,36 +278,26 @@ public class LookupCoordinatorManager } } - public Collection discoverTiers() + public Set discoverTiers() { - try { - Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); - return listenerDiscoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY); - } - catch (IOException e) { - throw Throwables.propagate(e); - } + Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); + return lookupNodeDiscovery.getAllTiers(); } public Collection discoverNodesInTier(String tier) { - try { - Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); - return Collections2.transform( - listenerDiscoverer.getNodes(LookupModule.getTierListenerPath(tier)), - new Function() + Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); + return Collections2.transform( + lookupNodeDiscovery.getNodesInTier(tier), + new Function() + { + @Override + public HostAndPort apply(HostAndPortWithScheme input) { - @Override - public HostAndPort apply(HostAndPortWithScheme input) - { - return input.getHostAndPort(); - } + return input.getHostAndPort(); } - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } + } + ); } public Map> getLastKnownLookupsStateOnNodes() @@ -348,6 +341,10 @@ public class LookupCoordinatorManager try { LOG.debug("Starting."); + if (lookupNodeDiscovery == null) { + lookupNodeDiscovery = new LookupNodeDiscovery(druidNodeDiscoveryProvider); + } + //first ensure that previous executorService from last cycle of start/stop has finished completely. //so that we don't have multiple live executorService instances lying around doing lookup management. if (executorService != null && @@ -522,7 +519,7 @@ public class LookupCoordinatorManager LOG.debug("Starting lookup mgmt for tier [%s].", tierEntry.getKey()); final Map tierLookups = tierEntry.getValue(); - for (final HostAndPortWithScheme node : listenerDiscoverer.getNodes(LookupModule.getTierListenerPath(tierEntry.getKey()))) { + for (final HostAndPortWithScheme node : lookupNodeDiscovery.getNodesInTier(tierEntry.getKey())) { LOG.debug( "Starting lookup mgmt for tier [%s] and host [%s:%s:%s].", diff --git a/server/src/main/java/io/druid/server/lookup/cache/LookupNodeDiscovery.java b/server/src/main/java/io/druid/server/lookup/cache/LookupNodeDiscovery.java new file mode 100644 index 00000000000..b4b56abca35 --- /dev/null +++ b/server/src/main/java/io/druid/server/lookup/cache/LookupNodeDiscovery.java @@ -0,0 +1,88 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableSet; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; +import io.druid.server.http.HostAndPortWithScheme; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.Set; + +/** + * A Helper class that uses DruidNodeDiscovery to discover lookup nodes and tiers. + */ +public class LookupNodeDiscovery +{ + private final DruidNodeDiscovery druidNodeDiscovery; + + LookupNodeDiscovery(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider) + { + this.druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(LookupNodeService.DISCOVERY_SERVICE_KEY); + } + + public Collection getNodesInTier(String tier) + { + return Collections2.transform( + Collections2.filter( + druidNodeDiscovery.getAllNodes(), + new Predicate() + { + @Override + public boolean apply(@Nullable DiscoveryDruidNode node) + { + return tier.equals(((LookupNodeService) node.getServices() + .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier()); + } + } + ), + new Function() + { + @Override + public HostAndPortWithScheme apply(@Nullable DiscoveryDruidNode input) + { + return HostAndPortWithScheme.fromString( + input.getDruidNode().getServiceScheme(), + input.getDruidNode().getHostAndPortToUse() + ); + } + } + ); + } + + public Set getAllTiers() + { + ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); + + druidNodeDiscovery.getAllNodes().stream().forEach( + node -> builder.add(((LookupNodeService) node.getServices() + .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier()) + ); + + return builder.build(); + } +} diff --git a/server/src/main/java/io/druid/server/router/QueryHostFinder.java b/server/src/main/java/io/druid/server/router/QueryHostFinder.java index f311aeb6897..1d8f14087ee 100644 --- a/server/src/main/java/io/druid/server/router/QueryHostFinder.java +++ b/server/src/main/java/io/druid/server/router/QueryHostFinder.java @@ -24,12 +24,12 @@ import com.google.common.collect.FluentIterable; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.query.Query; import java.util.Collection; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** @@ -52,27 +52,27 @@ public class QueryHostFinder public Server findServer(Query query) { - final Pair selected = hostSelector.select(query); + final Pair selected = hostSelector.select(query); return findServerInner(selected); } public Server findDefaultServer() { - final Pair selected = hostSelector.getDefaultLookup(); + final Pair selected = hostSelector.getDefaultLookup(); return findServerInner(selected); } public Collection getAllHosts() { return FluentIterable - .from((Collection) hostSelector.getAllBrokers().values()) + .from((Collection>) hostSelector.getAllBrokers().values()) .transformAndConcat( - new Function>() + new Function, Iterable>() { @Override - public Iterable apply(ServerDiscoverySelector input) + public Iterable apply(List input) { - return input.getAll(); + return input; } } ).transform(new Function() @@ -118,16 +118,15 @@ public class QueryHostFinder return server.getHost(); } - private Server findServerInner(final Pair selected) + private Server findServerInner(final Pair selected) { if (selected == null) { log.error("Danger, Will Robinson! Unable to find any brokers!"); } final String serviceName = selected == null ? hostSelector.getDefaultServiceName() : selected.lhs; - final ServerDiscoverySelector selector = selected == null ? null : selected.rhs; + Server server = selected == null ? null : selected.rhs; - Server server = selector == null ? null : selector.pick(); if (server == null) { log.error( "WTF?! No server found for serviceName[%s]. Using backup", diff --git a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java index 222bf2a60e4..f82d8a3e4a0 100644 --- a/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java +++ b/server/src/main/java/io/druid/server/router/TieredBrokerHostSelector.java @@ -19,14 +19,17 @@ package io.druid.server.router; +import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; -import io.druid.client.selector.HostSelector; -import io.druid.curator.discovery.ServerDiscoveryFactory; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.client.selector.Server; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.Pair; import io.druid.java.util.common.lifecycle.LifecycleStart; @@ -37,38 +40,75 @@ import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; import org.joda.time.Interval; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** */ -public class TieredBrokerHostSelector implements HostSelector +public class TieredBrokerHostSelector { private static EmittingLogger log = new EmittingLogger(TieredBrokerHostSelector.class); private final CoordinatorRuleManager ruleManager; private final TieredBrokerConfig tierConfig; - private final ServerDiscoveryFactory serverDiscoveryFactory; - private final ConcurrentHashMap selectorMap = new ConcurrentHashMap<>(); private final List strategies; + // brokerService -> broker-nodes-holder + private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); + + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final Object lock = new Object(); private volatile boolean started = false; + private static final Function TO_SERVER = new Function() + { + @Override + public Server apply(final DiscoveryDruidNode instance) + { + return new Server() + { + @Override + public String getHost() + { + return instance.getDruidNode().getHostAndPortToUse(); + } + + @Override + public String getAddress() + { + return instance.getDruidNode().getHost(); + } + + @Override + public int getPort() + { + return instance.getDruidNode().getPortToUse(); + } + + @Override + public String getScheme() + { + return instance.getDruidNode().getServiceScheme(); + } + }; + } + }; + @Inject public TieredBrokerHostSelector( CoordinatorRuleManager ruleManager, TieredBrokerConfig tierConfig, - ServerDiscoveryFactory serverDiscoveryFactory, + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, List strategies ) { this.ruleManager = ruleManager; this.tierConfig = tierConfig; - this.serverDiscoveryFactory = serverDiscoveryFactory; + this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; this.strategies = strategies; } @@ -80,17 +120,42 @@ public class TieredBrokerHostSelector implements HostSelector return; } - try { - for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { - ServerDiscoverySelector selector = serverDiscoveryFactory.createSelector(entry.getValue()); - selector.start(); - selectorMap.put(entry.getValue(), selector); - } - } - catch (Exception e) { - throw Throwables.propagate(e); + for (Map.Entry entry : tierConfig.getTierToBrokerMap().entrySet()) { + servers.put(entry.getValue(), new NodesHolder()); } + DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER); + druidNodeDiscovery.registerListener( + new DruidNodeDiscovery.Listener() + { + @Override + public void nodesAdded(List nodes) + { + nodes.forEach( + (node) -> { + NodesHolder nodesHolder = servers.get(node.getDruidNode().getServiceName()); + if (nodesHolder != null) { + nodesHolder.add(node.getDruidNode().getHostAndPortToUse(), TO_SERVER.apply(node)); + } + } + ); + } + + @Override + public void nodesRemoved(List nodes) + { + nodes.forEach( + (node) -> { + NodesHolder nodesHolder = servers.get(node.getDruidNode().getServiceName()); + if (nodesHolder != null) { + nodesHolder.remove(node.getDruidNode().getHostAndPortToUse()); + } + } + ); + } + } + ); + started = true; } } @@ -104,27 +169,16 @@ public class TieredBrokerHostSelector implements HostSelector return; } - try { - for (ServerDiscoverySelector selector : selectorMap.values()) { - selector.stop(); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - started = false; } } - @Override public String getDefaultServiceName() { return tierConfig.getDefaultBrokerServiceName(); } - @Override - public Pair select(final Query query) + public Pair select(final Query query) { synchronized (lock) { if (!ruleManager.isStarted() || !started) { @@ -186,29 +240,83 @@ public class TieredBrokerHostSelector implements HostSelector brokerServiceName = tierConfig.getDefaultBrokerServiceName(); } - ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName); + NodesHolder nodesHolder = servers.get(brokerServiceName); - if (retVal == null) { + if (nodesHolder == null) { log.error( - "WTF?! No selector found for brokerServiceName[%s]. Using default selector for[%s]", + "WTF?! No nodesHolder found for brokerServiceName[%s]. Using default selector for[%s]", brokerServiceName, tierConfig.getDefaultBrokerServiceName() ); - retVal = selectorMap.get(tierConfig.getDefaultBrokerServiceName()); + nodesHolder = servers.get(tierConfig.getDefaultBrokerServiceName()); } - return new Pair<>(brokerServiceName, retVal); + return new Pair<>(brokerServiceName, nodesHolder.pick()); } - public Pair getDefaultLookup() + public Pair getDefaultLookup() { final String brokerServiceName = tierConfig.getDefaultBrokerServiceName(); - final ServerDiscoverySelector retVal = selectorMap.get(brokerServiceName); - return new Pair<>(brokerServiceName, retVal); + return new Pair<>(brokerServiceName, servers.get(brokerServiceName).pick()); } - public Map getAllBrokers() + public Map> getAllBrokers() { - return Collections.unmodifiableMap(selectorMap); + return Maps.transformValues( + servers, + new Function>() + { + @Override + public List apply(NodesHolder input) + { + return input.getAll(); + } + } + ); + } + + private static class NodesHolder + { + private int roundRobinIndex = 0; + + private Map nodesMap = new HashMap<>(); + private ImmutableList nodes = ImmutableList.of(); + + void add(String id, Server node) + { + synchronized (this) { + nodesMap.put(id, node); + nodes = ImmutableList.copyOf(nodesMap.values()); + } + } + + void remove(String id) + { + synchronized (this) { + if (nodesMap.remove(id) != null) { + nodes = ImmutableList.copyOf(nodesMap.values()); + } + } + } + + List getAll() + { + return nodes; + } + + Server pick() + { + ImmutableList currNodes = nodes; + + if (currNodes.size() == 0) { + return null; + } + + if (roundRobinIndex >= currNodes.size()) { + roundRobinIndex %= currNodes.size(); + } + + return currNodes.get(roundRobinIndex++); + } } } diff --git a/server/src/test/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java index 2ddce0843d9..1203e86e365 100644 --- a/server/src/test/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java +++ b/server/src/test/java/io/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java @@ -39,6 +39,7 @@ import org.junit.Test; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -128,15 +129,15 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase new DruidNodeDiscovery.Listener() { @Override - public void nodeAdded(DiscoveryDruidNode node) + public void nodesAdded(List nodes) { - coordNodes.add(node); + coordNodes.addAll(nodes); } @Override - public void nodeRemoved(DiscoveryDruidNode node) + public void nodesRemoved(List nodes) { - coordNodes.remove(node); + coordNodes.removeAll(nodes); } } ); @@ -146,15 +147,15 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase new DruidNodeDiscovery.Listener() { @Override - public void nodeAdded(DiscoveryDruidNode node) + public void nodesAdded(List nodes) { - overlordNodes.add(node); + overlordNodes.addAll(nodes); } @Override - public void nodeRemoved(DiscoveryDruidNode node) + public void nodesRemoved(List nodes) { - overlordNodes.remove(node); + overlordNodes.removeAll(nodes); } } ); diff --git a/server/src/test/java/io/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/io/druid/discovery/DruidNodeDiscoveryProviderTest.java index e9e5b0e689f..d7467b2d858 100644 --- a/server/src/test/java/io/druid/discovery/DruidNodeDiscoveryProviderTest.java +++ b/server/src/test/java/io/druid/discovery/DruidNodeDiscoveryProviderTest.java @@ -19,6 +19,7 @@ package io.druid.discovery; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.druid.server.DruidNode; @@ -47,15 +48,15 @@ public class DruidNodeDiscoveryProviderTest new DruidNodeDiscovery.Listener() { @Override - public void nodeAdded(DiscoveryDruidNode node) + public void nodesAdded(List nodes) { - dataNodes.add(node); + dataNodes.addAll(nodes); } @Override - public void nodeRemoved(DiscoveryDruidNode node) + public void nodesRemoved(List nodes) { - dataNodes.remove(node); + dataNodes.removeAll(nodes); } } ); @@ -66,15 +67,15 @@ public class DruidNodeDiscoveryProviderTest new DruidNodeDiscovery.Listener() { @Override - public void nodeAdded(DiscoveryDruidNode node) + public void nodesAdded(List nodes) { - lookupNodes.add(node); + lookupNodes.addAll(nodes); } @Override - public void nodeRemoved(DiscoveryDruidNode node) + public void nodesRemoved(List nodes) { - lookupNodes.remove(node); + lookupNodes.removeAll(nodes); } } ); @@ -204,14 +205,14 @@ public class DruidNodeDiscoveryProviderTest void add(DiscoveryDruidNode node) { for (DruidNodeDiscovery.Listener listener : listeners) { - listener.nodeAdded(node); + listener.nodesAdded(ImmutableList.of(node)); } } void remove(DiscoveryDruidNode node) { for (DruidNodeDiscovery.Listener listener : listeners) { - listener.nodeRemoved(node); + listener.nodesRemoved(ImmutableList.of(node)); } } } diff --git a/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java b/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java index 23e0b2ce91f..499c236303b 100644 --- a/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java +++ b/server/src/test/java/io/druid/server/http/LookupCoordinatorResourceTest.java @@ -22,6 +22,7 @@ package io.druid.server.http; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.io.ByteSource; import com.google.common.net.HostAndPort; import io.druid.audit.AuditInfo; @@ -42,8 +43,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Set; public class LookupCoordinatorResourceTest { @@ -147,7 +148,7 @@ public class LookupCoordinatorResourceTest @Test public void testDiscoveryGet() { - final List tiers = ImmutableList.of(); + final Set tiers = ImmutableSet.of(); final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock( LookupCoordinatorManager.class); EasyMock.expect(lookupCoordinatorManager.discoverTiers()).andReturn(tiers).once(); diff --git a/server/src/test/java/io/druid/server/listener/announcer/ListenerDiscovererTest.java b/server/src/test/java/io/druid/server/listener/announcer/ListenerDiscovererTest.java deleted file mode 100644 index 5bee4731fa0..00000000000 --- a/server/src/test/java/io/druid/server/listener/announcer/ListenerDiscovererTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.server.listener.announcer; - -import com.google.common.collect.ImmutableSet; -import io.druid.concurrent.Execs; -import io.druid.curator.CuratorTestBase; -import io.druid.curator.announcement.Announcer; -import io.druid.segment.CloserRule; -import io.druid.server.http.HostAndPortWithScheme; -import io.druid.server.initialization.ZkPathsConfig; -import org.apache.curator.utils.ZKPaths; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -public class ListenerDiscovererTest extends CuratorTestBase -{ - @Rule - public CloserRule closerRule = new CloserRule(true); - - @Test(timeout = 60_000L) - public void testFullService() throws Exception - { - final String listenerKey = "listenerKey"; - final String listenerTier = "listenerTier"; - final String listenerTierChild = "tierChild"; - final String tierZkPath = ZKPaths.makePath(listenerTier, listenerTierChild); - - setupServerAndCurator(); - final ExecutorService executorService = Execs.singleThreaded("listenerDiscovererTest--%s"); - closerRule.closeLater(new Closeable() - { - @Override - public void close() throws IOException - { - executorService.shutdownNow(); - } - }); - closerRule.closeLater(server); - closerRule.closeLater(curator); - curator.start(); - curator.blockUntilConnected(10, TimeUnit.SECONDS); - Assert.assertEquals("/druid", curator.create().forPath("/druid")); - final Announcer announcer = new Announcer(curator, executorService); - closerRule.closeLater(new Closeable() - { - @Override - public void close() throws IOException - { - announcer.stop(); - } - }); - final ListeningAnnouncerConfig config = new ListeningAnnouncerConfig(new ZkPathsConfig()); - final ListenerDiscoverer listenerDiscoverer = new ListenerDiscoverer(curator, config); - listenerDiscoverer.start(); - closerRule.closeLater(new Closeable() - { - @Override - public void close() throws IOException - { - listenerDiscoverer.stop(); - } - }); - Assert.assertTrue(listenerDiscoverer.getNodes(listenerKey).isEmpty()); - - final HostAndPortWithScheme node = HostAndPortWithScheme.fromParts("http", "someHost", 8888); - final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer( - announcer, - config, - listenerKey, - node - ) - { - }; - listenerResourceAnnouncer.start(); - closerRule.closeLater(new Closeable() - { - @Override - public void close() throws IOException - { - listenerResourceAnnouncer.stop(); - } - }); - - final ListenerResourceAnnouncer tieredListenerResourceAnnouncer = new ListenerResourceAnnouncer( - announcer, - config, - tierZkPath, - node - ) - { - }; - tieredListenerResourceAnnouncer.start(); - closerRule.closeLater(new Closeable() - { - @Override - public void close() throws IOException - { - tieredListenerResourceAnnouncer.stop(); - } - }); - - announcer.start(); - - Assert.assertNotNull(curator.checkExists().forPath(config.getAnnouncementPath(listenerKey))); - // Have to wait for background syncing - while (listenerDiscoverer.getNodes(listenerKey).isEmpty()) { - // Will timeout at test's timeout setting - Thread.sleep(1); - } - Assert.assertEquals( - ImmutableSet.of(HostAndPortWithScheme.fromString(node.toString())), - listenerDiscoverer.getNodes(listenerKey) - ); - // 2nd call of two concurrent getNewNodes should return no entry collection - listenerDiscoverer.getNewNodes(listenerKey); - Assert.assertEquals( - 0, - listenerDiscoverer.getNewNodes(listenerKey).size() - ); - Assert.assertEquals( - ImmutableSet.of(listenerKey, listenerTier), - ImmutableSet.copyOf(listenerDiscoverer.discoverChildren(null)) - ); - Assert.assertEquals( - ImmutableSet.of(listenerTierChild), - ImmutableSet.copyOf(listenerDiscoverer.discoverChildren(listenerTier)) - ); - } -} diff --git a/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java b/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java index 958fea55d39..b128b0bc9b5 100644 --- a/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java +++ b/server/src/test/java/io/druid/server/lookup/cache/LookupCoordinatorManagerTest.java @@ -36,17 +36,14 @@ import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.http.client.response.SequenceInputStreamResponseHandler; import io.druid.audit.AuditInfo; import io.druid.common.config.JacksonConfigManager; -import io.druid.java.util.common.StringUtils; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; -import io.druid.query.lookup.LookupModule; +import io.druid.java.util.common.StringUtils; import io.druid.query.lookup.LookupsState; import io.druid.server.http.HostAndPortWithScheme; -import io.druid.server.listener.announcer.ListenerDiscoverer; import org.easymock.EasyMock; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; import org.joda.time.Duration; import org.junit.After; import org.junit.Assert; @@ -63,6 +60,7 @@ import java.io.InputStream; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -73,7 +71,9 @@ public class LookupCoordinatorManagerTest @Rule public ExpectedException expectedException = ExpectedException.none(); private final ObjectMapper mapper = new DefaultObjectMapper(); - private final ListenerDiscoverer discoverer = EasyMock.createStrictMock(ListenerDiscoverer.class); + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class); + private final LookupNodeDiscovery lookupNodeDiscovery = EasyMock.createStrictMock( + LookupNodeDiscovery.class); private final HttpClient client = EasyMock.createStrictMock(HttpClient.class); private final JacksonConfigManager configManager = EasyMock.createStrictMock(JacksonConfigManager.class); private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig = new LookupCoordinatorManagerConfig(); @@ -139,6 +139,8 @@ public class LookupCoordinatorManagerTest SERVICE_EMITTER.flush(); EVENT_EMITS.set(0L); + EasyMock.reset(lookupNodeDiscovery); + EasyMock.reset(configManager); EasyMock.expect( configManager.watch( @@ -532,7 +534,7 @@ public class LookupCoordinatorManagerTest { final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -555,7 +557,7 @@ public class LookupCoordinatorManagerTest { final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -587,7 +589,7 @@ public class LookupCoordinatorManagerTest { final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -624,7 +626,7 @@ public class LookupCoordinatorManagerTest final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost"); final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -686,7 +688,7 @@ public class LookupCoordinatorManagerTest final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost"); final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -741,7 +743,7 @@ public class LookupCoordinatorManagerTest { final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -771,7 +773,7 @@ public class LookupCoordinatorManagerTest { final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -805,7 +807,7 @@ public class LookupCoordinatorManagerTest final AuditInfo auditInfo = new AuditInfo("author", "comment", "localhost"); final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -860,7 +862,7 @@ public class LookupCoordinatorManagerTest ); final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -905,7 +907,7 @@ public class LookupCoordinatorManagerTest ); final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -930,7 +932,7 @@ public class LookupCoordinatorManagerTest { final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -956,7 +958,7 @@ public class LookupCoordinatorManagerTest ); final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -985,7 +987,7 @@ public class LookupCoordinatorManagerTest ); final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -1010,7 +1012,7 @@ public class LookupCoordinatorManagerTest { final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -1052,11 +1054,11 @@ public class LookupCoordinatorManagerTest HostAndPortWithScheme host1 = HostAndPortWithScheme.fromParts("http", "host1", 1234); HostAndPortWithScheme host2 = HostAndPortWithScheme.fromParts("http", "host2", 3456); - EasyMock.reset(discoverer); + EasyMock.reset(lookupNodeDiscovery); EasyMock.expect( - discoverer.getNodes(LookupModule.getTierListenerPath("tier1")) + lookupNodeDiscovery.getNodesInTier("tier1") ).andReturn(ImmutableList.of(host1, host2)).anyTimes(); - EasyMock.replay(discoverer); + EasyMock.replay(lookupNodeDiscovery); LookupCoordinatorManager.LookupsCommunicator lookupsCommunicator = EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class); EasyMock.expect( @@ -1134,10 +1136,11 @@ public class LookupCoordinatorManagerTest }; final LookupCoordinatorManager manager = new LookupCoordinatorManager( - discoverer, + druidNodeDiscoveryProvider, configManager, lookupCoordinatorManagerConfig, - lookupsCommunicator + lookupsCommunicator, + lookupNodeDiscovery ); Assert.assertTrue(manager.knownOldState.get().isEmpty()); @@ -1155,7 +1158,7 @@ public class LookupCoordinatorManagerTest Thread.sleep(100); } - EasyMock.verify(discoverer, configManager, lookupsCommunicator); + EasyMock.verify(lookupNodeDiscovery, configManager, lookupsCommunicator); } @Test @@ -1163,7 +1166,7 @@ public class LookupCoordinatorManagerTest { LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -1199,7 +1202,7 @@ public class LookupCoordinatorManagerTest { LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -1246,7 +1249,7 @@ public class LookupCoordinatorManagerTest final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -1283,7 +1286,7 @@ public class LookupCoordinatorManagerTest final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, lookupCoordinatorManagerConfig @@ -1327,60 +1330,56 @@ public class LookupCoordinatorManagerTest @Test public void testLookupDiscoverAll() throws Exception { - final List fakeChildren = ImmutableList.of("tier1", "tier2"); - EasyMock.reset(discoverer); - EasyMock.expect(discoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)) + final Set fakeChildren = ImmutableSet.of("tier1", "tier2"); + EasyMock.reset(lookupNodeDiscovery); + EasyMock.expect(lookupNodeDiscovery.getAllTiers()) .andReturn(fakeChildren) .once(); - EasyMock.replay(discoverer); + EasyMock.replay(lookupNodeDiscovery); + final LookupCoordinatorManager manager = new LookupCoordinatorManager( - client, - discoverer, - mapper, + druidNodeDiscoveryProvider, configManager, - lookupCoordinatorManagerConfig + lookupCoordinatorManagerConfig, + EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class), + lookupNodeDiscovery ); + manager.start(); Assert.assertEquals(fakeChildren, manager.discoverTiers()); - EasyMock.verify(discoverer); + EasyMock.verify(lookupNodeDiscovery); } @Test - public void testLookupDiscoverAllExceptional() throws Exception + public void testDiscoverNodesInTier() throws Exception { - final IOException ex = new IOException("some exception"); - EasyMock.reset(discoverer); - EasyMock.expect(discoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY)) - .andThrow(ex) + EasyMock.reset(lookupNodeDiscovery); + EasyMock.expect(lookupNodeDiscovery.getNodesInTier("tier")) + .andReturn( + ImmutableSet.of( + HostAndPortWithScheme.fromParts("http", "h1", 8080), + HostAndPortWithScheme.fromParts("http", "h2", 8080) + ) + ) .once(); - expectedException.expectCause( - new BaseMatcher() - { - @Override - public boolean matches(Object o) - { - return o == ex; - } + EasyMock.replay(lookupNodeDiscovery); - @Override - public void describeTo(Description description) - { - - } - } - ); - EasyMock.replay(discoverer); final LookupCoordinatorManager manager = new LookupCoordinatorManager( - client, - discoverer, - mapper, + druidNodeDiscoveryProvider, configManager, - lookupCoordinatorManagerConfig + lookupCoordinatorManagerConfig, + EasyMock.createMock(LookupCoordinatorManager.LookupsCommunicator.class), + lookupNodeDiscovery ); manager.start(); - manager.discoverTiers(); - EasyMock.verify(discoverer); + Assert.assertEquals( + ImmutableSet.of( + HostAndPort.fromParts("h1", 8080), + HostAndPort.fromParts("h2", 8080) + ), + ImmutableSet.copyOf(manager.discoverNodesInTier("tier"))); + EasyMock.verify(lookupNodeDiscovery); } //tests that lookups stored in db from 0.10.0 are converted and restored. @@ -1434,7 +1433,7 @@ public class LookupCoordinatorManagerTest final LookupCoordinatorManager manager = new LookupCoordinatorManager( client, - discoverer, + druidNodeDiscoveryProvider, mapper, configManager, new LookupCoordinatorManagerConfig() diff --git a/server/src/test/java/io/druid/server/lookup/cache/LookupNodeDiscoveryTest.java b/server/src/test/java/io/druid/server/lookup/cache/LookupNodeDiscoveryTest.java new file mode 100644 index 00000000000..3884471fda3 --- /dev/null +++ b/server/src/test/java/io/druid/server/lookup/cache/LookupNodeDiscoveryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.lookup.cache; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; +import io.druid.discovery.LookupNodeService; +import io.druid.server.DruidNode; +import io.druid.server.http.HostAndPortWithScheme; +import io.druid.server.initialization.ServerConfig; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + */ +public class LookupNodeDiscoveryTest +{ + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private DruidNodeDiscovery druidNodeDiscovery; + private LookupNodeDiscovery lookupNodeDiscovery; + + @Before + public void setup() + { + druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class); + + druidNodeDiscovery = EasyMock.createStrictMock(DruidNodeDiscovery.class); + + EasyMock.expect(druidNodeDiscoveryProvider.getForService(LookupNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + + DiscoveryDruidNode node1 = new DiscoveryDruidNode( + new DruidNode("s1", "h1", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, + ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1")) + ); + + DiscoveryDruidNode node2 = new DiscoveryDruidNode( + new DruidNode("s2", "h2", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1")) + ); + + DiscoveryDruidNode node3 = new DiscoveryDruidNode( + new DruidNode("s3", "h3", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_PEON, + ImmutableMap.of( + LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier2")) + ); + + EasyMock.expect(druidNodeDiscovery.getAllNodes()) + .andReturn(ImmutableSet.of(node1, node2, node3)) + .anyTimes();; + + EasyMock.replay(druidNodeDiscoveryProvider, druidNodeDiscovery); + + lookupNodeDiscovery = new LookupNodeDiscovery(druidNodeDiscoveryProvider); + } + + @Test + public void testGetNodesInTier() throws Exception + { + Assert.assertEquals( + ImmutableList.of( + HostAndPortWithScheme.fromParts("http", "h1", 8080), + HostAndPortWithScheme.fromParts("http", "h2", 8080) + ), + ImmutableList.copyOf(lookupNodeDiscovery.getNodesInTier("tier1")) + ); + + Assert.assertEquals( + ImmutableList.of( + HostAndPortWithScheme.fromParts("http", "h3", 8080) + ), + ImmutableList.copyOf(lookupNodeDiscovery.getNodesInTier("tier2")) + ); + + Assert.assertEquals( + ImmutableList.of(), + ImmutableList.copyOf(lookupNodeDiscovery.getNodesInTier("tier3")) + ); + + EasyMock.verify(druidNodeDiscoveryProvider, druidNodeDiscovery); + } + + @Test + public void testGetAllTiers() throws Exception + { + Assert.assertEquals( + ImmutableSet.of("tier1", "tier2"), + lookupNodeDiscovery.getAllTiers() + ); + + EasyMock.verify(druidNodeDiscoveryProvider, druidNodeDiscovery); + } +} diff --git a/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java index 1f9c3f2f0fd..78e22f159f8 100644 --- a/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java +++ b/server/src/test/java/io/druid/server/router/QueryHostFinderTest.java @@ -19,10 +19,7 @@ package io.druid.server.router; -import com.google.common.collect.ImmutableMap; -import io.druid.client.DruidServer; import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; import io.druid.query.Query; @@ -37,44 +34,19 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; -import java.util.LinkedHashMap; /** */ public class QueryHostFinderTest { - private ServerDiscoverySelector selector; private TieredBrokerHostSelector brokerSelector; - private TieredBrokerConfig config; private Server server; @Before public void setUp() throws Exception { - selector = EasyMock.createMock(ServerDiscoverySelector.class); brokerSelector = EasyMock.createMock(TieredBrokerHostSelector.class); - config = new TieredBrokerConfig() - { - @Override - public LinkedHashMap getTierToBrokerMap() - { - return new LinkedHashMap<>( - ImmutableMap.of( - "hot", "hotBroker", - "medium", "mediumBroker", - DruidServer.DEFAULT_TIER, "coldBroker" - ) - ); - } - - @Override - public String getDefaultBrokerServiceName() - { - return "hotBroker"; - } - }; - server = new Server() { @Override @@ -101,24 +73,22 @@ public class QueryHostFinderTest return 0; } }; + + EasyMock.expect(brokerSelector.select(EasyMock.anyObject(Query.class))).andReturn( + Pair.of("service", server) + ); + EasyMock.replay(brokerSelector); } @After public void tearDown() throws Exception { EasyMock.verify(brokerSelector); - EasyMock.verify(selector); } @Test public void testFindServer() throws Exception { - EasyMock.expect(brokerSelector.select(EasyMock.anyObject())).andReturn(new Pair("hotBroker", selector)); - EasyMock.replay(brokerSelector); - - EasyMock.expect(selector.pick()).andReturn(server).once(); - EasyMock.replay(selector); - QueryHostFinder queryRunner = new QueryHostFinder( brokerSelector ); diff --git a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java index 5f9ccc4f91c..a2c253dd405 100644 --- a/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java +++ b/server/src/test/java/io/druid/server/router/TieredBrokerHostSelectorTest.java @@ -20,21 +20,32 @@ package io.druid.server.router; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.metamx.http.client.HttpClient; import io.druid.client.DruidServer; -import io.druid.curator.discovery.ServerDiscoveryFactory; -import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.client.selector.Server; +import io.druid.discovery.DiscoveryDruidNode; +import io.druid.discovery.DruidNodeDiscovery; +import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Json; import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.Pair; import io.druid.query.Druids; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.server.DruidNode; import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.Rule; +import io.druid.server.initialization.ServerConfig; import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.After; @@ -42,7 +53,9 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; @@ -50,18 +63,59 @@ import java.util.List; */ public class TieredBrokerHostSelectorTest { - private ServerDiscoveryFactory factory; - private ServerDiscoverySelector selector; + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private DruidNodeDiscovery druidNodeDiscovery; private TieredBrokerHostSelector brokerSelector; + private DiscoveryDruidNode node1; + private DiscoveryDruidNode node2; + private DiscoveryDruidNode node3; + @Before public void setUp() throws Exception { - factory = EasyMock.createMock(ServerDiscoveryFactory.class); - selector = EasyMock.createMock(ServerDiscoverySelector.class); + druidNodeDiscoveryProvider = EasyMock.createStrictMock(DruidNodeDiscoveryProvider.class); + + node1 = new DiscoveryDruidNode( + new DruidNode("hotBroker", "hotHost", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableMap.of() + ); + + node2 = new DiscoveryDruidNode( + new DruidNode("coldBroker", "coldHost1", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableMap.of() + ); + + node3 = new DiscoveryDruidNode( + new DruidNode("coldBroker", "coldHost2", 8080, null, new ServerConfig()), + DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, + ImmutableMap.of() + ); + + druidNodeDiscovery = new DruidNodeDiscovery() + { + @Override + public Collection getAllNodes() + { + return ImmutableSet.of(node1, node2, node3); + } + + @Override + public void registerListener(Listener listener) + { + listener.nodesAdded(ImmutableList.of(node1, node2, node3)); + } + }; + + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER)) + .andReturn(druidNodeDiscovery);; + + EasyMock.replay(druidNodeDiscoveryProvider); brokerSelector = new TieredBrokerHostSelector( - new TestRuleManager(null, null, null, null), + new TestRuleManager(null, null, null), new TieredBrokerConfig() { @Override @@ -82,20 +136,11 @@ public class TieredBrokerHostSelectorTest return "hotBroker"; } }, - factory, + druidNodeDiscoveryProvider, Arrays.asList(new TimeBoundaryTieredBrokerSelectorStrategy(), new PriorityTieredBrokerSelectorStrategy(0, 1)) ); - EasyMock.expect(factory.createSelector(EasyMock.anyObject())).andReturn(selector).atLeastOnce(); - EasyMock.replay(factory); - - selector.start(); - EasyMock.expectLastCall().atLeastOnce(); - selector.stop(); - EasyMock.expectLastCall().atLeastOnce(); - EasyMock.replay(selector); brokerSelector.start(); - } @After @@ -103,39 +148,47 @@ public class TieredBrokerHostSelectorTest { brokerSelector.stop(); - EasyMock.verify(selector); - EasyMock.verify(factory); + EasyMock.verify(druidNodeDiscoveryProvider); } @Test public void testBasicSelect() throws Exception { - String brokerName = (String) brokerSelector.select( - Druids.newTimeseriesQueryBuilder() - .dataSource("test") - .granularity("all") - .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) - .intervals(Arrays.asList(Intervals.of("2011-08-31/2011-09-01"))) - .build() - ).lhs; + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity("all") + .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) + .intervals(Arrays.asList(Intervals.of("2011-08-31/2011-09-01"))) + .build(); - Assert.assertEquals("coldBroker", brokerName); + Pair p = brokerSelector.select(query); + Assert.assertEquals("coldBroker", p.lhs); + Assert.assertEquals("coldHost1:8080", p.rhs.getHost()); + + p = brokerSelector.select(query); + Assert.assertEquals("coldBroker", p.lhs); + Assert.assertEquals("coldHost2:8080", p.rhs.getHost()); + + p = brokerSelector.select(query); + Assert.assertEquals("coldBroker", p.lhs); + Assert.assertEquals("coldHost1:8080", p.rhs.getHost()); } @Test public void testBasicSelect2() throws Exception { - String brokerName = (String) brokerSelector.select( + Pair p = brokerSelector.select( Druids.newTimeseriesQueryBuilder() .dataSource("test") .granularity("all") .aggregators(Arrays.asList(new CountAggregatorFactory("rows"))) .intervals(Arrays.asList(Intervals.of("2013-08-31/2013-09-01"))) .build() - ).lhs; + ); - Assert.assertEquals("hotBroker", brokerName); + Assert.assertEquals("hotBroker", p.lhs); + Assert.assertEquals("hotHost:8080", p.rhs.getHost()); } @Test @@ -241,16 +294,38 @@ public class TieredBrokerHostSelectorTest Assert.assertEquals("hotBroker", brokerName); } + @Test + public void testGetAllBrokers() + { + Assert.assertEquals( + ImmutableMap.of( + "mediumBroker", ImmutableList.of(), + "coldBroker", ImmutableList.of("coldHost1:8080", "coldHost2:8080"), + "hotBroker", ImmutableList.of("hotHost:8080") + ), + Maps.transformValues( + brokerSelector.getAllBrokers(), + new Function, List>() + { + @Override + public List apply(@Nullable List servers) + { + return Lists.transform(servers, server -> server.getHost()); + } + } + ) + ); + } + private static class TestRuleManager extends CoordinatorRuleManager { public TestRuleManager( @Global HttpClient httpClient, @Json ObjectMapper jsonMapper, - Supplier config, - ServerDiscoverySelector selector + Supplier config ) { - super(httpClient, jsonMapper, config, selector); + super(httpClient, jsonMapper, config, null); } @Override diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 3bade6247bf..95586e7af29 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -21,7 +21,6 @@ package io.druid.cli; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; -import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.name.Names; import io.airlift.airline.Command; @@ -34,8 +33,6 @@ import io.druid.client.cache.CacheMonitor; import io.druid.client.selector.CustomTierSelectorStrategyConfig; import io.druid.client.selector.ServerSelectorStrategy; import io.druid.client.selector.TierSelectorStrategy; -import io.druid.discovery.DruidNodeDiscoveryProvider; -import io.druid.discovery.LookupNodeService; import io.druid.guice.CacheModule; import io.druid.guice.DruidProcessingModule; import io.druid.guice.Jerseys; @@ -123,14 +120,6 @@ public class CliBroker extends ServerRunnable MetricsModule.register(binder, CacheMonitor.class); LifecycleModule.register(binder, Server.class); - - binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider( - new DiscoverySideEffectsProvider( - DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, - ImmutableList.of(LookupNodeService.class) - ) - ).in(LazySingleton.class); - LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class)); } }, new LookupModule(), diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 939ddeaceb5..3aff7ddf15e 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -75,7 +75,6 @@ import io.druid.server.http.RulesResource; import io.druid.server.http.ServersResource; import io.druid.server.http.TiersResource; import io.druid.server.initialization.jetty.JettyServerInitializer; -import io.druid.server.listener.announcer.ListenerDiscoverer; import io.druid.server.lookup.cache.LookupCoordinatorManager; import io.druid.server.lookup.cache.LookupCoordinatorManagerConfig; import io.druid.server.router.TieredBrokerConfig; @@ -168,9 +167,6 @@ public class CliCoordinator extends ServerRunnable binder.bind(LookupCoordinatorManager.class).in(LazySingleton.class); binder.bind(DruidCoordinator.class); - binder.bind(ListenerDiscoverer.class).in(ManageLifecycle.class); - - LifecycleModule.register(binder, ListenerDiscoverer.class); LifecycleModule.register(binder, MetadataStorage.class); LifecycleModule.register(binder, DruidCoordinator.class);