From dc6ed7b8d4208e49e2fc34e43e8c26a74f249e87 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Thu, 3 Nov 2016 08:20:20 -0700 Subject: [PATCH] Remove pluggability of ZenPing (#21049) Plugins: Remove pluggability of ZenPing ZenPing is the part of zen discovery which knows how to ping nodes. There is only one alternative implementation, which is just for testing. This change removes the ability to add custom zen pings, and instead hooks in the MockZenPing for tests through an overridden method in MockNode. This also folds in the ZenPingService (which was really just a single method) into ZenDiscovery, and removes the idea of having multiple ZenPing instances. Finally, this was the last usage of the ExtensionPoint classes, so that is also removed here. --- .../component/AbstractLifecycleComponent.java | 11 +- .../common/util/ExtensionPoint.java | 250 ------------------ .../discovery/DiscoveryModule.java | 74 +++--- .../discovery/zen/FaultDetection.java | 5 +- .../discovery/zen/UnicastZenPing.java | 73 ++--- .../discovery/zen/ZenDiscovery.java | 48 ++-- .../elasticsearch/discovery/zen/ZenPing.java | 22 +- .../discovery/zen/ZenPingService.java | 105 -------- .../java/org/elasticsearch/node/Node.java | 23 +- .../elasticsearch/tribe/TribeClientNode.java | 36 --- .../org/elasticsearch/tribe/TribeService.java | 4 +- .../common/util/ExtensionPointTests.java | 63 ----- .../discovery/DiscoveryModuleTests.java | 5 +- .../DiscoveryWithServiceDisruptionsIT.java | 24 +- .../discovery/zen/UnicastZenPingTests.java | 35 +-- .../discovery/zen/ZenDiscoveryUnitTests.java | 4 +- .../java/org/elasticsearch/tribe/TribeIT.java | 24 +- docs/plugins/discovery-file.asciidoc | 2 +- .../elasticsearch/tribe/TribeUnitTests.java | 4 +- .../java/org/elasticsearch/node/MockNode.java | 18 ++ .../test/discovery/MockZenPing.java | 32 +-- 21 files changed, 210 insertions(+), 652 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java delete mode 100644 core/src/main/java/org/elasticsearch/discovery/zen/ZenPingService.java delete mode 100644 core/src/main/java/org/elasticsearch/tribe/TribeClientNode.java delete mode 100644 core/src/test/java/org/elasticsearch/common/util/ExtensionPointTests.java diff --git a/core/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java b/core/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java index e2868b23e89..2ed43ccaa24 100644 --- a/core/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java +++ b/core/src/main/java/org/elasticsearch/common/component/AbstractLifecycleComponent.java @@ -21,6 +21,7 @@ package org.elasticsearch.common.component; import org.elasticsearch.common.settings.Settings; +import java.io.IOException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -101,11 +102,17 @@ public abstract class AbstractLifecycleComponent extends AbstractComponent imple listener.beforeClose(); } lifecycle.moveToClosed(); - doClose(); + try { + doClose(); + } catch (IOException e) { + // TODO: we need to separate out closing (ie shutting down) services, vs releasing runtime transient + // structures. Shutting down services should use IOUtils.close + logger.warn("failed to close " + getClass().getName(), e); + } for (LifecycleListener listener : listeners) { listener.afterClose(); } } - protected abstract void doClose(); + protected abstract void doClose() throws IOException; } diff --git a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java b/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java deleted file mode 100644 index a5dac12fab7..00000000000 --- a/core/src/main/java/org/elasticsearch/common/util/ExtensionPoint.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.util; - -import org.elasticsearch.common.inject.Binder; -import org.elasticsearch.common.inject.multibindings.MapBinder; -import org.elasticsearch.common.inject.multibindings.Multibinder; -import org.elasticsearch.common.settings.Settings; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * This class defines an official elasticsearch extension point. It registers - * all extensions by a single name and ensures that extensions are not registered - * more than once. - */ -public abstract class ExtensionPoint { - protected final String name; - protected final Class[] singletons; - - /** - * Creates a new extension point - * - * @param name the human readable underscore case name of the extension point. This is used in error messages etc. - * @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)} - */ - public ExtensionPoint(String name, Class... singletons) { - this.name = name; - this.singletons = singletons; - } - - /** - * Binds the extension as well as the singletons to the given guice binder. - * - * @param binder the binder to use - */ - public final void bind(Binder binder) { - for (Class c : singletons) { - binder.bind(c).asEagerSingleton(); - } - bindExtensions(binder); - } - - /** - * Subclasses can bind their type, map or set extensions here. - */ - protected abstract void bindExtensions(Binder binder); - - /** - * A map based extension point which allows to register keyed implementations ie. parsers or some kind of strategies. - */ - public static class ClassMap extends ExtensionPoint { - protected final Class extensionClass; - protected final Map> extensions = new HashMap<>(); - private final Set reservedKeys; - - /** - * Creates a new {@link ClassMap} - * - * @param name the human readable underscore case name of the extension point. This is used in error messages etc. - * @param extensionClass the base class that should be extended - * @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)} - * @param reservedKeys a set of reserved keys by internal implementations - */ - public ClassMap(String name, Class extensionClass, Set reservedKeys, Class... singletons) { - super(name, singletons); - this.extensionClass = extensionClass; - this.reservedKeys = reservedKeys; - } - - /** - * Returns the extension for the given key or null - */ - public Class getExtension(String type) { - return extensions.get(type); - } - - /** - * Registers an extension class for a given key. This method will thr - * - * @param key the extensions key - * @param extension the extension - * @throws IllegalArgumentException iff the key is already registered or if the key is a reserved key for an internal implementation - */ - public final void registerExtension(String key, Class extension) { - if (extensions.containsKey(key) || reservedKeys.contains(key)) { - throw new IllegalArgumentException("Can't register the same [" + this.name + "] more than once for [" + key + "]"); - } - extensions.put(key, extension); - } - - @Override - protected final void bindExtensions(Binder binder) { - MapBinder parserMapBinder = MapBinder.newMapBinder(binder, String.class, extensionClass); - for (Map.Entry> clazz : extensions.entrySet()) { - parserMapBinder.addBinding(clazz.getKey()).to(clazz.getValue()); - } - } - } - - /** - * A Type extension point which basically allows to registered keyed extensions like {@link ClassMap} - * but doesn't instantiate and bind all the registered key value pairs but instead replace a singleton based on a given setting via {@link #bindType(Binder, Settings, String, String)} - * Note: {@link #bind(Binder)} is not supported by this class - */ - public static final class SelectedType extends ClassMap { - - public SelectedType(String name, Class extensionClass) { - super(name, extensionClass, Collections.emptySet()); - } - - /** - * Binds the extension class to the class that is registered for the give configured for the settings key in - * the settings object. - * - * @param binder the binder to use - * @param settings the settings to look up the key to find the implementation to bind - * @param settingsKey the key to use with the settings - * @param defaultValue the default value if the settings do not contain the key, or null if there is no default - * @return the actual bound type key - */ - public String bindType(Binder binder, Settings settings, String settingsKey, String defaultValue) { - final String type = settings.get(settingsKey, defaultValue); - if (type == null) { - throw new IllegalArgumentException("Missing setting [" + settingsKey + "]"); - } - final Class instance = getExtension(type); - if (instance == null) { - throw new IllegalArgumentException("Unknown [" + this.name + "] type [" + type + "] possible values: " - + extensions.keySet()); - } - if (extensionClass == instance) { - binder.bind(extensionClass).asEagerSingleton(); - } else { - binder.bind(extensionClass).to(instance).asEagerSingleton(); - } - return type; - } - - } - - /** - * A set based extension point which allows to register extended classes that might be used to chain additional functionality etc. - */ - public static final class ClassSet extends ExtensionPoint { - protected final Class extensionClass; - private final Set> extensions = new HashSet<>(); - - /** - * Creates a new {@link ClassSet} - * - * @param name the human readable underscore case name of the extension point. This is used in error messages etc. - * @param extensionClass the base class that should be extended - * @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)} - */ - public ClassSet(String name, Class extensionClass, Class... singletons) { - super(name, singletons); - this.extensionClass = extensionClass; - } - - /** - * Registers a new extension - * - * @param extension the extension to register - * @throws IllegalArgumentException iff the class is already registered - */ - public void registerExtension(Class extension) { - if (extensions.contains(extension)) { - throw new IllegalArgumentException("Can't register the same [" + this.name + "] more than once for [" + extension.getName() + "]"); - } - extensions.add(extension); - } - - @Override - protected void bindExtensions(Binder binder) { - Multibinder allocationMultibinder = Multibinder.newSetBinder(binder, extensionClass); - for (Class clazz : extensions) { - binder.bind(clazz).asEagerSingleton(); - allocationMultibinder.addBinding().to(clazz); - } - } - - public boolean isEmpty() { - return extensions.isEmpty(); - } - } - - /** - * A an instance of a map, mapping one instance value to another. Both key and value are instances, not classes - * like with other extension points. - */ - public static final class InstanceMap extends ExtensionPoint { - private final Map map = new HashMap<>(); - private final Class keyType; - private final Class valueType; - - /** - * Creates a new {@link ClassSet} - * - * @param name the human readable underscore case name of the extension point. This is used in error messages. - * @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)} - */ - public InstanceMap(String name, Class keyType, Class valueType, Class... singletons) { - super(name, singletons); - this.keyType = keyType; - this.valueType = valueType; - } - - /** - * Registers a mapping from {@code key} to {@code value} - * - * @throws IllegalArgumentException iff the key is already registered - */ - public void registerExtension(K key, V value) { - V old = map.put(key, value); - if (old != null) { - throw new IllegalArgumentException("Cannot register [" + this.name + "] with key [" + key + "] to [" + value + "], already registered to [" + old + "]"); - } - } - - @Override - protected void bindExtensions(Binder binder) { - MapBinder mapBinder = MapBinder.newMapBinder(binder, keyType, valueType); - for (Map.Entry entry : map.entrySet()) { - mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue()); - } - } - } -} diff --git a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index ef3018c19b7..61316a852bb 100644 --- a/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -19,20 +19,6 @@ package org.elasticsearch.discovery; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.ExtensionPoint; -import org.elasticsearch.discovery.zen.ZenDiscovery; -import org.elasticsearch.plugins.DiscoveryPlugin; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.discovery.zen.ZenPing; -import org.elasticsearch.discovery.zen.ZenPingService; -import org.elasticsearch.discovery.zen.UnicastHostsProvider; -import org.elasticsearch.discovery.zen.UnicastZenPing; - import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -41,6 +27,16 @@ import java.util.Objects; import java.util.function.Function; import java.util.function.Supplier; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.ZenDiscovery; +import org.elasticsearch.plugins.DiscoveryPlugin; +import org.elasticsearch.transport.TransportService; + /** * A module for loading classes for node discovery. */ @@ -52,8 +48,7 @@ public class DiscoveryModule extends AbstractModule { new Setting<>("discovery.zen.hosts_provider", DISCOVERY_TYPE_SETTING, Function.identity(), Property.NodeScope); private final Settings settings; - private final Map> unicastHostProviders; - private final ExtensionPoint.ClassSet zenPings = new ExtensionPoint.ClassSet<>("zen_ping", ZenPing.class); + private final UnicastHostsProvider hostsProvider; private final Map> discoveryTypes = new HashMap<>(); public DiscoveryModule(Settings settings, TransportService transportService, NetworkService networkService, @@ -62,16 +57,30 @@ public class DiscoveryModule extends AbstractModule { addDiscoveryType("none", NoneDiscovery.class); addDiscoveryType("zen", ZenDiscovery.class); - Map> hostProviders = new HashMap<>(); - hostProviders.put("zen", () -> Collections::emptyList); - for (DiscoveryPlugin plugin : plugins) { - plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> { - if (hostProviders.put(entry.getKey(), entry.getValue()) != null) { - throw new IllegalArgumentException("Cannot specify zen hosts provider [" + entry.getKey() + "] twice"); - } - }); + String discoveryType = DISCOVERY_TYPE_SETTING.get(settings); + if (discoveryType.equals("none") == false) { + Map> hostProviders = new HashMap<>(); + hostProviders.put("zen", () -> Collections::emptyList); + for (DiscoveryPlugin plugin : plugins) { + plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> { + if (hostProviders.put(entry.getKey(), entry.getValue()) != null) { + throw new IllegalArgumentException("Cannot specify zen hosts provider [" + entry.getKey() + "] twice"); + } + }); + } + String hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings); + Supplier hostsProviderSupplier = hostProviders.get(hostsProviderName); + if (hostsProviderSupplier == null) { + throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName + "]"); + } + hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get()); + } else { + hostsProvider = null; } - unicastHostProviders = Collections.unmodifiableMap(hostProviders); + } + + public UnicastHostsProvider getHostsProvider() { + return hostsProvider; } /** @@ -84,10 +93,6 @@ public class DiscoveryModule extends AbstractModule { discoveryTypes.put(type, clazz); } - public void addZenPing(Class clazz) { - zenPings.registerExtension(clazz); - } - @Override protected void configure() { String discoveryType = DISCOVERY_TYPE_SETTING.get(settings); @@ -97,18 +102,7 @@ public class DiscoveryModule extends AbstractModule { } if (discoveryType.equals("none") == false) { - bind(ZenPingService.class).asEagerSingleton(); - String hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings); - Supplier hostsProviderSupplier = unicastHostProviders.get(hostsProviderName); - if (hostsProviderSupplier == null) { - throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName + "]"); - } - UnicastHostsProvider hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get()); bind(UnicastHostsProvider.class).toInstance(hostsProvider); - if (zenPings.isEmpty()) { - zenPings.registerExtension(UnicastZenPing.class); - } - zenPings.bind(binder()); } bind(Discovery.class).to(discoveryClass).asEagerSingleton(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java index f1f8b28ad09..715e8be03ef 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java @@ -19,6 +19,8 @@ package org.elasticsearch.discovery.zen; +import java.io.Closeable; + import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -36,7 +38,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; * A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection}, * making sure both use the same setting. */ -public abstract class FaultDetection extends AbstractComponent { +public abstract class FaultDetection extends AbstractComponent implements Closeable { public static final Setting CONNECT_ON_NETWORK_DISCONNECT_SETTING = Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, Property.NodeScope); @@ -80,6 +82,7 @@ public abstract class FaultDetection extends AbstractComponent { } } + @Override public void close() { transportService.removeConnectionListener(connectionListener); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index bd8e2353f72..f6870cc05b6 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -19,6 +19,26 @@ package org.elasticsearch.discovery.zen; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; @@ -30,8 +50,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; @@ -56,34 +75,13 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; import static org.elasticsearch.discovery.zen.ZenPing.PingResponse.readPingResponse; -public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPing { +public class UnicastZenPing extends AbstractComponent implements ZenPing { public static final String ACTION_NAME = "internal:discovery/zen/unicast"; public static final Setting> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = @@ -125,15 +123,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin private volatile boolean closed = false; - @Inject public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, - UnicastHostsProvider unicastHostsProviders) { + UnicastHostsProvider unicastHostsProvider) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - - this.hostsProvider = unicastHostsProviders; + this.hostsProvider = unicastHostsProvider; this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings); List hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); @@ -190,26 +186,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin } @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { + public void close() throws IOException { ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); - try { - IOUtils.close(receivedResponses.values()); - } catch (IOException e) { - throw new ElasticsearchException("Error wile closing send ping handlers", e); - } + IOUtils.close(receivedResponses.values()); closed = true; } @Override - public void setPingContextProvider(PingContextProvider contextProvider) { + public void start(PingContextProvider contextProvider) { this.contextProvider = contextProvider; } @@ -501,9 +485,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin } private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) { - if (!lifecycle.started()) { - throw new IllegalStateException("received ping request while not started"); - } temporalResponses.add(request.pingResponse); threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() { @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index a50ebab4c00..90e7d3e2144 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -67,6 +68,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -105,7 +107,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private AllocationService allocationService; private final ClusterName clusterName; private final DiscoverySettings discoverySettings; - private final ZenPingService pingService; + private final ZenPing zenPing; private final MasterFaultDetection masterFD; private final NodesFaultDetection nodesFD; private final PublishClusterStateAction publishClusterState; @@ -137,18 +139,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor; @Inject - public ZenDiscovery(Settings settings, ThreadPool threadPool, - TransportService transportService, final ClusterService clusterService, ClusterSettings clusterSettings, - ZenPingService pingService) { + public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService, + ClusterService clusterService, ClusterSettings clusterSettings, ZenPing zenPing) { super(settings); this.clusterService = clusterService; this.clusterName = clusterService.getClusterName(); this.transportService = transportService; this.discoverySettings = new DiscoverySettings(settings, clusterSettings); - this.pingService = pingService; + this.zenPing = zenPing; this.electMaster = new ElectMasterService(settings); this.pingTimeout = PING_TIMEOUT_SETTING.get(settings); - this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); this.joinRetryAttempts = JOIN_RETRY_ATTEMPTS_SETTING.get(settings); this.joinRetryDelay = JOIN_RETRY_DELAY_SETTING.get(settings); @@ -171,7 +171,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterService); this.masterFD.addListener(new MasterNodeFailureListener()); - this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterService.getClusterName()); this.nodesFD.addListener(new NodeFaultDetectionListener()); @@ -183,9 +182,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover new NewPendingClusterStateListener(), discoverySettings, clusterService.getClusterName()); - this.pingService.setPingContextProvider(this); this.membership = new MembershipAction(settings, transportService, this, new MembershipListener()); - this.joinThreadControl = new JoinThreadControl(threadPool); transportService.registerRequestHandler( @@ -201,7 +198,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover protected void doStart() { nodesFD.setLocalNode(clusterService.localNode()); joinThreadControl.start(); - pingService.start(); + zenPing.start(this); this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::rejoin, logger); } @@ -233,7 +230,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover @Override protected void doStop() { joinThreadControl.stop(); - pingService.stop(); masterFD.stop("zen disco stop"); nodesFD.stop(); DiscoveryNodes nodes = nodes(); @@ -264,10 +260,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } @Override - protected void doClose() { - masterFD.close(); - nodesFD.close(); - pingService.close(); + protected void doClose() throws IOException { + IOUtils.close(masterFD, nodesFD, zenPing); } @Override @@ -871,7 +865,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover private DiscoveryNode findMaster() { logger.trace("starting to ping"); - List fullPingResponses = pingService.pingAndWait(pingTimeout).toList(); + List fullPingResponses = pingAndWait(pingTimeout).toList(); if (fullPingResponses == null) { logger.trace("No full ping responses"); return null; @@ -1013,6 +1007,28 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover } } + private ZenPing.PingCollection pingAndWait(TimeValue timeout) { + final ZenPing.PingCollection response = new ZenPing.PingCollection(); + final CountDownLatch latch = new CountDownLatch(1); + try { + zenPing.ping(pings -> { + response.addPings(pings); + latch.countDown(); + }, timeout); + } catch (Exception ex) { + logger.warn("Ping execution failed", ex); + latch.countDown(); + } + + try { + latch.await(); + return response; + } catch (InterruptedException e) { + logger.trace("pingAndWait interrupted"); + return response; + } + } + private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener { @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java index be1df88d334..cb2c8cb5019 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPing.java @@ -19,15 +19,7 @@ package org.elasticsearch.discovery.zen; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.unit.TimeValue; - +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -36,11 +28,19 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; + import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -public interface ZenPing extends LifecycleComponent { +public interface ZenPing extends Closeable { - void setPingContextProvider(PingContextProvider contextProvider); + void start(PingContextProvider contextProvider); void ping(PingListener listener, TimeValue timeout); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPingService.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenPingService.java deleted file mode 100644 index 3aa3017f549..00000000000 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenPingService.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery.zen; - -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -public class ZenPingService extends AbstractLifecycleComponent { - - private List zenPings = Collections.emptyList(); - - @Inject - public ZenPingService(Settings settings, Set zenPings) { - super(settings); - this.zenPings = Collections.unmodifiableList(new ArrayList<>(zenPings)); - } - - public List zenPings() { - return this.zenPings; - } - - public void setPingContextProvider(PingContextProvider contextProvider) { - if (lifecycle.started()) { - throw new IllegalStateException("Can't set nodes provider when started"); - } - for (ZenPing zenPing : zenPings) { - zenPing.setPingContextProvider(contextProvider); - } - } - - @Override - protected void doStart() { - for (ZenPing zenPing : zenPings) { - zenPing.start(); - } - } - - @Override - protected void doStop() { - for (ZenPing zenPing : zenPings) { - zenPing.stop(); - } - } - - @Override - protected void doClose() { - for (ZenPing zenPing : zenPings) { - zenPing.close(); - } - } - - public ZenPing.PingCollection pingAndWait(TimeValue timeout) { - final ZenPing.PingCollection response = new ZenPing.PingCollection(); - final CountDownLatch latch = new CountDownLatch(zenPings.size()); - for (ZenPing zenPing : zenPings) { - final AtomicBoolean counted = new AtomicBoolean(); - try { - zenPing.ping(pings -> { - response.addPings(pings); - if (counted.compareAndSet(false, true)) { - latch.countDown(); - } - }, timeout); - } catch (Exception ex) { - logger.warn("Ping execution failed", ex); - if (counted.compareAndSet(false, true)) { - latch.countDown(); - } - } - } - try { - latch.await(); - return response; - } catch (InterruptedException e) { - logger.trace("pingAndWait interrupted"); - return response; - } - } -} diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 8d76ae9cbc3..f5ad4ff8772 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -76,6 +76,9 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.UnicastZenPing; +import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.GatewayAllocator; @@ -319,7 +322,8 @@ public class Node implements Closeable { final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); clusterService.add(scriptModule.getScriptService()); resourcesToClose.add(clusterService); - final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(), classpathPlugins); + final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(), + s -> newTribeClientNode(s, classpathPlugins)); resourcesToClose.add(tribeService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); @@ -393,7 +397,10 @@ public class Node implements Closeable { b.bind(HttpServer.class).toProvider(Providers.of(null)); }; } - modules.add(new DiscoveryModule(this.settings, transportService, networkService, pluginsService.filterPlugins(DiscoveryPlugin.class))); + final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, transportService, networkService, + pluginsService.filterPlugins(DiscoveryPlugin.class)); + final ZenPing zenPing = newZenPing(settings, threadPool, transportService, discoveryModule.getHostsProvider()); + modules.add(discoveryModule); pluginsService.processModules(modules); modules.add(b -> { b.bind(IndicesQueriesRegistry.class).toInstance(searchModule.getQueryParserRegistry()); @@ -425,6 +432,7 @@ public class Node implements Closeable { b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService())); b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings, indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings())); + b.bind(ZenPing.class).toInstance(zenPing); { RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); @@ -881,4 +889,15 @@ public class Node implements Closeable { } return customNameResolvers; } + + /** Create a new ZenPing instance for use in zen discovery. */ + protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, + UnicastHostsProvider hostsProvider) { + return new UnicastZenPing(settings, threadPool, transportService, hostsProvider); + } + + /** Constructs an internal node used as a client into a cluster fronted by this tribe node. */ + protected Node newTribeClientNode(Settings settings, Collection> classpathPlugins) { + return new Node(new Environment(settings), classpathPlugins); + } } diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeClientNode.java b/core/src/main/java/org/elasticsearch/tribe/TribeClientNode.java deleted file mode 100644 index d9520aef768..00000000000 --- a/core/src/main/java/org/elasticsearch/tribe/TribeClientNode.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.tribe; - -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; -import org.elasticsearch.node.Node; -import org.elasticsearch.plugins.Plugin; - -import java.util.Collection; - -/** - * An internal node that connects to a remove cluster, as part of a tribe node. - */ -class TribeClientNode extends Node { - TribeClientNode(Settings settings, Collection> classpathPlugins) { - super(new Environment(settings), classpathPlugins); - } -} diff --git a/core/src/main/java/org/elasticsearch/tribe/TribeService.java b/core/src/main/java/org/elasticsearch/tribe/TribeService.java index 7871a0a6f39..69ad77fc91e 100644 --- a/core/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/core/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -185,7 +185,7 @@ public class TribeService extends AbstractLifecycleComponent { private final List nodes = new CopyOnWriteArrayList<>(); public TribeService(Settings settings, ClusterService clusterService, final String tribeNodeId, - Collection> classpathPlugins) { + Function clientNodeBuilder) { super(settings); this.clusterService = clusterService; Map nodesSettings = new HashMap<>(settings.getGroups("tribe", true)); @@ -193,7 +193,7 @@ public class TribeService extends AbstractLifecycleComponent { nodesSettings.remove("on_conflict"); // remove prefix settings that don't indicate a client for (Map.Entry entry : nodesSettings.entrySet()) { Settings clientSettings = buildClientSettings(entry.getKey(), tribeNodeId, settings, entry.getValue()); - nodes.add(new TribeClientNode(clientSettings, classpathPlugins)); + nodes.add(clientNodeBuilder.apply(clientSettings)); } this.blockIndicesMetadata = BLOCKS_METADATA_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY); diff --git a/core/src/test/java/org/elasticsearch/common/util/ExtensionPointTests.java b/core/src/test/java/org/elasticsearch/common/util/ExtensionPointTests.java deleted file mode 100644 index 8fabbcc60ae..00000000000 --- a/core/src/test/java/org/elasticsearch/common/util/ExtensionPointTests.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.common.util; - -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import org.elasticsearch.common.inject.Binder; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.ModulesBuilder; -import org.elasticsearch.test.ESTestCase; - -public class ExtensionPointTests extends ESTestCase { - - public void testClassSet() { - final ExtensionPoint.ClassSet allocationDeciders = new ExtensionPoint.ClassSet<>("test_class", TestBaseClass.class, Consumer.class); - allocationDeciders.registerExtension(TestImpl.class); - Injector injector = new ModulesBuilder().add(new Module() { - @Override - public void configure(Binder binder) { - allocationDeciders.bind(binder); - } - }).createInjector(); - assertEquals(1, TestImpl.instances.get()); - - } - - public static class TestBaseClass {} - - public static class Consumer { - @Inject - public Consumer(Set deciders, TestImpl other) { - // we require the TestImpl more than once to ensure it's bound as a singleton - } - } - - public static class TestImpl extends TestBaseClass { - static final AtomicInteger instances = new AtomicInteger(0); - - @Inject - public TestImpl() { - instances.incrementAndGet(); - } - } -} diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index 6f0aeca9d77..28775defe45 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -76,8 +76,9 @@ public class DiscoveryModuleTests extends ModuleTestCase { public void testUnknownHostsProvider() { Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "dne").build(); - DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.emptyList()); - assertBindingFailure(module, "Unknown zen hosts provider"); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> + new DiscoveryModule(settings, null, null, Collections.emptyList())); + assertEquals("Unknown zen hosts provider [dne]", e.getMessage()); } public void testDuplicateHostsProvider() { diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 0208664033c..bbde01a6686 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -56,7 +56,6 @@ import org.elasticsearch.discovery.zen.PublishClusterStateAction; import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenPing; -import org.elasticsearch.discovery.zen.ZenPingService; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.indices.store.IndicesStoreIntegrationIT; import org.elasticsearch.monitor.jvm.HotThreads; @@ -195,12 +194,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { ensureStableCluster(numberOfNodes); // TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results - for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { - for (ZenPing zenPing : pingService.zenPings()) { - if (zenPing instanceof UnicastZenPing) { - ((UnicastZenPing) zenPing).clearTemporalResponses(); - } - } + ZenPing zenPing = internalCluster().getInstance(ZenPing.class); + if (zenPing instanceof UnicastZenPing) { + ((UnicastZenPing) zenPing).clearTemporalResponses(); } return nodes; } @@ -858,10 +854,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list // includes all the other nodes that have pinged it and the issue doesn't manifest - for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { - for (ZenPing zenPing : pingService.zenPings()) { - ((UnicastZenPing) zenPing).clearTemporalResponses(); - } + ZenPing zenPing = internalCluster().getInstance(ZenPing.class); + if (zenPing instanceof UnicastZenPing) { + ((UnicastZenPing) zenPing).clearTemporalResponses(); } // Simulate a network issue between the unlucky node and elected master node in both directions. @@ -896,10 +891,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { // Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list // includes all the other nodes that have pinged it and the issue doesn't manifest - for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) { - for (ZenPing zenPing : pingService.zenPings()) { - ((UnicastZenPing) zenPing).clearTemporalResponses(); - } + ZenPing zenPing = internalCluster().getInstance(ZenPing.class); + if (zenPing instanceof UnicastZenPing) { + ((UnicastZenPing) zenPing).clearTemporalResponses(); } // Simulate a network issue between the unicast target node and the rest of the cluster diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java index 106612f22e0..4294bdd3dd4 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.discovery.zen; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -33,10 +34,6 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.zen.ElectMasterService; -import org.elasticsearch.discovery.zen.UnicastZenPing; -import org.elasticsearch.discovery.zen.PingContextProvider; -import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -47,6 +44,7 @@ import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; @@ -62,7 +60,7 @@ import static org.hamcrest.Matchers.greaterThan; public class UnicastZenPingTests extends ESTestCase { private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList; - public void testSimplePings() throws InterruptedException { + public void testSimplePings() throws IOException, InterruptedException { int startPort = 11000 + randomIntBetween(0, 1000); int endPort = startPort + 10; Settings settings = Settings.builder() @@ -97,7 +95,7 @@ public class UnicastZenPingTests extends ESTestCase { Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build(); UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER); - zenPingA.setPingContextProvider(new PingContextProvider() { + zenPingA.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build(); @@ -108,10 +106,9 @@ public class UnicastZenPingTests extends ESTestCase { return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build(); } }); - zenPingA.start(); UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER); - zenPingB.setPingContextProvider(new PingContextProvider() { + zenPingB.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build(); @@ -122,7 +119,6 @@ public class UnicastZenPingTests extends ESTestCase { return state; } }); - zenPingB.start(); UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) { @Override @@ -130,7 +126,7 @@ public class UnicastZenPingTests extends ESTestCase { return versionD; } }; - zenPingC.setPingContextProvider(new PingContextProvider() { + zenPingC.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build(); @@ -141,10 +137,9 @@ public class UnicastZenPingTests extends ESTestCase { return state; } }); - zenPingC.start(); UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER); - zenPingD.setPingContextProvider(new PingContextProvider() { + zenPingD.start(new PingContextProvider() { @Override public DiscoveryNodes nodes() { return DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D").build(); @@ -155,7 +150,6 @@ public class UnicastZenPingTests extends ESTestCase { return state; } }); - zenPingD.start(); try { logger.info("ping from UZP_A"); @@ -185,15 +179,12 @@ public class UnicastZenPingTests extends ESTestCase { assertThat(pingResponses.size(), equalTo(0)); assertCounters(handleD, handleA, handleB, handleC, handleD); } finally { - zenPingA.close(); - zenPingB.close(); - zenPingC.close(); - zenPingD.close(); - handleA.transportService.close(); - handleB.transportService.close(); - handleC.transportService.close(); - handleD.transportService.close(); - terminate(threadPool); + try { + IOUtils.close(zenPingA, zenPingB, zenPingC, zenPingD, + handleA.transportService, handleB.transportService, handleC.transportService, handleD.transportService); + } finally { + terminate(threadPool); + } } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 8ef4751c9cf..88cf23fe938 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -269,8 +270,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase { private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) { ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet()); - ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, clusterSettings, zenPingService); + ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, clusterSettings, new MockZenPing(settings)); zenDiscovery.start(); return zenDiscovery; } diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 0bc4974f285..6121b2c0c86 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -19,6 +19,18 @@ package org.elasticsearch.tribe; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.client.Client; @@ -46,18 +58,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - import static java.util.stream.Collectors.toSet; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; diff --git a/docs/plugins/discovery-file.asciidoc b/docs/plugins/discovery-file.asciidoc index a848cdd6ff1..e8e1e42f867 100644 --- a/docs/plugins/discovery-file.asciidoc +++ b/docs/plugins/discovery-file.asciidoc @@ -89,5 +89,5 @@ running on the default port: ---------------------------------------------------------------- Host names are allowed instead of IP addresses (similar to -`discovery.zen.ping.unicast.hosts`), and IPv6 addresses must be +`discovery.zen.ping.unicast.hosts`), and IPv6 addresses must be specified in brackets with the port coming after the brackets. diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java index fd54c5fadbe..34621802f55 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/tribe/TribeUnitTests.java @@ -69,14 +69,14 @@ public class TribeUnitTests extends ESTestCase { .build(); final List> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class); - tribe1 = new TribeClientNode( + tribe1 = new MockNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe1") .put("node.name", "tribe1_node") .put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong()) .build(), mockPlugins).start(); - tribe2 = new TribeClientNode( + tribe2 = new MockNode( Settings.builder() .put(baseSettings) .put("cluster.name", "tribe2") diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 6eb28bea14a..38e8a8436b1 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.discovery.zen.UnicastHostsProvider; +import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -33,6 +35,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; @@ -96,6 +99,21 @@ public class MockNode extends Node { } } + @Override + protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, + UnicastHostsProvider hostsProvider) { + if (getPluginsService().filterPlugins(MockZenPing.TestPlugin.class).isEmpty()) { + return super.newZenPing(settings, threadPool, transportService, hostsProvider); + } else { + return new MockZenPing(settings); + } + } + + @Override + protected Node newTribeClientNode(Settings settings, Collection> classpathPlugins) { + return new MockNode(settings, classpathPlugins); + } + @Override protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java index 3e02b9de0fb..d5e7de1d9bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/MockZenPing.java @@ -20,6 +20,7 @@ package org.elasticsearch.test.discovery; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -40,7 +41,10 @@ import java.util.stream.Collectors; * A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging * to be immediate and can be used to speed up tests. */ -public final class MockZenPing extends AbstractLifecycleComponent implements ZenPing { +public final class MockZenPing extends AbstractComponent implements ZenPing { + + /** A marker plugin used by {@link org.elasticsearch.node.MockNode} to indicate this mock zen ping should be used. */ + public static class TestPlugin extends Plugin {} static final Map> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap(); @@ -52,8 +56,11 @@ public final class MockZenPing extends AbstractLifecycleComponent implements Zen } @Override - public void setPingContextProvider(PingContextProvider contextProvider) { + public void start(PingContextProvider contextProvider) { this.contextProvider = contextProvider; + assert contextProvider != null; + boolean added = getActiveNodesForCurrentCluster().add(this); + assert added; } @Override @@ -75,33 +82,14 @@ public final class MockZenPing extends AbstractLifecycleComponent implements Zen return new PingResponse(clusterState.nodes().getLocalNode(), clusterState.nodes().getMasterNode(), clusterState); } - @Override - protected void doStart() { - assert contextProvider != null; - boolean added = getActiveNodesForCurrentCluster().add(this); - assert added; - } - private Set getActiveNodesForCurrentCluster() { return activeNodesPerCluster.computeIfAbsent(getClusterName(), clusterName -> ConcurrentCollections.newConcurrentSet()); } @Override - protected void doStop() { + public void close() { boolean found = getActiveNodesForCurrentCluster().remove(this); assert found; } - - @Override - protected void doClose() { - - } - - public static class TestPlugin extends Plugin implements DiscoveryPlugin { - - public void onModule(DiscoveryModule discoveryModule) { - discoveryModule.addZenPing(MockZenPing.class); - } - } }