Remove pluggability of ZenPing (#21049)
Plugins: Remove pluggability of ZenPing ZenPing is the part of zen discovery which knows how to ping nodes. There is only one alternative implementation, which is just for testing. This change removes the ability to add custom zen pings, and instead hooks in the MockZenPing for tests through an overridden method in MockNode. This also folds in the ZenPingService (which was really just a single method) into ZenDiscovery, and removes the idea of having multiple ZenPing instances. Finally, this was the last usage of the ExtensionPoint classes, so that is also removed here.
This commit is contained in:
parent
7ec51d628d
commit
dc6ed7b8d4
|
@ -21,6 +21,7 @@ package org.elasticsearch.common.component;
|
|||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
|
@ -101,11 +102,17 @@ public abstract class AbstractLifecycleComponent extends AbstractComponent imple
|
|||
listener.beforeClose();
|
||||
}
|
||||
lifecycle.moveToClosed();
|
||||
doClose();
|
||||
try {
|
||||
doClose();
|
||||
} catch (IOException e) {
|
||||
// TODO: we need to separate out closing (ie shutting down) services, vs releasing runtime transient
|
||||
// structures. Shutting down services should use IOUtils.close
|
||||
logger.warn("failed to close " + getClass().getName(), e);
|
||||
}
|
||||
for (LifecycleListener listener : listeners) {
|
||||
listener.afterClose();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doClose();
|
||||
protected abstract void doClose() throws IOException;
|
||||
}
|
||||
|
|
|
@ -1,250 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.elasticsearch.common.inject.Binder;
|
||||
import org.elasticsearch.common.inject.multibindings.MapBinder;
|
||||
import org.elasticsearch.common.inject.multibindings.Multibinder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* This class defines an official elasticsearch extension point. It registers
|
||||
* all extensions by a single name and ensures that extensions are not registered
|
||||
* more than once.
|
||||
*/
|
||||
public abstract class ExtensionPoint {
|
||||
protected final String name;
|
||||
protected final Class<?>[] singletons;
|
||||
|
||||
/**
|
||||
* Creates a new extension point
|
||||
*
|
||||
* @param name the human readable underscore case name of the extension point. This is used in error messages etc.
|
||||
* @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)}
|
||||
*/
|
||||
public ExtensionPoint(String name, Class<?>... singletons) {
|
||||
this.name = name;
|
||||
this.singletons = singletons;
|
||||
}
|
||||
|
||||
/**
|
||||
* Binds the extension as well as the singletons to the given guice binder.
|
||||
*
|
||||
* @param binder the binder to use
|
||||
*/
|
||||
public final void bind(Binder binder) {
|
||||
for (Class<?> c : singletons) {
|
||||
binder.bind(c).asEagerSingleton();
|
||||
}
|
||||
bindExtensions(binder);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can bind their type, map or set extensions here.
|
||||
*/
|
||||
protected abstract void bindExtensions(Binder binder);
|
||||
|
||||
/**
|
||||
* A map based extension point which allows to register keyed implementations ie. parsers or some kind of strategies.
|
||||
*/
|
||||
public static class ClassMap<T> extends ExtensionPoint {
|
||||
protected final Class<T> extensionClass;
|
||||
protected final Map<String, Class<? extends T>> extensions = new HashMap<>();
|
||||
private final Set<String> reservedKeys;
|
||||
|
||||
/**
|
||||
* Creates a new {@link ClassMap}
|
||||
*
|
||||
* @param name the human readable underscore case name of the extension point. This is used in error messages etc.
|
||||
* @param extensionClass the base class that should be extended
|
||||
* @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)}
|
||||
* @param reservedKeys a set of reserved keys by internal implementations
|
||||
*/
|
||||
public ClassMap(String name, Class<T> extensionClass, Set<String> reservedKeys, Class<?>... singletons) {
|
||||
super(name, singletons);
|
||||
this.extensionClass = extensionClass;
|
||||
this.reservedKeys = reservedKeys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the extension for the given key or <code>null</code>
|
||||
*/
|
||||
public Class<? extends T> getExtension(String type) {
|
||||
return extensions.get(type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers an extension class for a given key. This method will thr
|
||||
*
|
||||
* @param key the extensions key
|
||||
* @param extension the extension
|
||||
* @throws IllegalArgumentException iff the key is already registered or if the key is a reserved key for an internal implementation
|
||||
*/
|
||||
public final void registerExtension(String key, Class<? extends T> extension) {
|
||||
if (extensions.containsKey(key) || reservedKeys.contains(key)) {
|
||||
throw new IllegalArgumentException("Can't register the same [" + this.name + "] more than once for [" + key + "]");
|
||||
}
|
||||
extensions.put(key, extension);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void bindExtensions(Binder binder) {
|
||||
MapBinder<String, T> parserMapBinder = MapBinder.newMapBinder(binder, String.class, extensionClass);
|
||||
for (Map.Entry<String, Class<? extends T>> clazz : extensions.entrySet()) {
|
||||
parserMapBinder.addBinding(clazz.getKey()).to(clazz.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A Type extension point which basically allows to registered keyed extensions like {@link ClassMap}
|
||||
* but doesn't instantiate and bind all the registered key value pairs but instead replace a singleton based on a given setting via {@link #bindType(Binder, Settings, String, String)}
|
||||
* Note: {@link #bind(Binder)} is not supported by this class
|
||||
*/
|
||||
public static final class SelectedType<T> extends ClassMap<T> {
|
||||
|
||||
public SelectedType(String name, Class<T> extensionClass) {
|
||||
super(name, extensionClass, Collections.emptySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Binds the extension class to the class that is registered for the give configured for the settings key in
|
||||
* the settings object.
|
||||
*
|
||||
* @param binder the binder to use
|
||||
* @param settings the settings to look up the key to find the implementation to bind
|
||||
* @param settingsKey the key to use with the settings
|
||||
* @param defaultValue the default value if the settings do not contain the key, or null if there is no default
|
||||
* @return the actual bound type key
|
||||
*/
|
||||
public String bindType(Binder binder, Settings settings, String settingsKey, String defaultValue) {
|
||||
final String type = settings.get(settingsKey, defaultValue);
|
||||
if (type == null) {
|
||||
throw new IllegalArgumentException("Missing setting [" + settingsKey + "]");
|
||||
}
|
||||
final Class<? extends T> instance = getExtension(type);
|
||||
if (instance == null) {
|
||||
throw new IllegalArgumentException("Unknown [" + this.name + "] type [" + type + "] possible values: "
|
||||
+ extensions.keySet());
|
||||
}
|
||||
if (extensionClass == instance) {
|
||||
binder.bind(extensionClass).asEagerSingleton();
|
||||
} else {
|
||||
binder.bind(extensionClass).to(instance).asEagerSingleton();
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A set based extension point which allows to register extended classes that might be used to chain additional functionality etc.
|
||||
*/
|
||||
public static final class ClassSet<T> extends ExtensionPoint {
|
||||
protected final Class<T> extensionClass;
|
||||
private final Set<Class<? extends T>> extensions = new HashSet<>();
|
||||
|
||||
/**
|
||||
* Creates a new {@link ClassSet}
|
||||
*
|
||||
* @param name the human readable underscore case name of the extension point. This is used in error messages etc.
|
||||
* @param extensionClass the base class that should be extended
|
||||
* @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)}
|
||||
*/
|
||||
public ClassSet(String name, Class<T> extensionClass, Class<?>... singletons) {
|
||||
super(name, singletons);
|
||||
this.extensionClass = extensionClass;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a new extension
|
||||
*
|
||||
* @param extension the extension to register
|
||||
* @throws IllegalArgumentException iff the class is already registered
|
||||
*/
|
||||
public void registerExtension(Class<? extends T> extension) {
|
||||
if (extensions.contains(extension)) {
|
||||
throw new IllegalArgumentException("Can't register the same [" + this.name + "] more than once for [" + extension.getName() + "]");
|
||||
}
|
||||
extensions.add(extension);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void bindExtensions(Binder binder) {
|
||||
Multibinder<T> allocationMultibinder = Multibinder.newSetBinder(binder, extensionClass);
|
||||
for (Class<? extends T> clazz : extensions) {
|
||||
binder.bind(clazz).asEagerSingleton();
|
||||
allocationMultibinder.addBinding().to(clazz);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return extensions.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A an instance of a map, mapping one instance value to another. Both key and value are instances, not classes
|
||||
* like with other extension points.
|
||||
*/
|
||||
public static final class InstanceMap<K, V> extends ExtensionPoint {
|
||||
private final Map<K, V> map = new HashMap<>();
|
||||
private final Class<K> keyType;
|
||||
private final Class<V> valueType;
|
||||
|
||||
/**
|
||||
* Creates a new {@link ClassSet}
|
||||
*
|
||||
* @param name the human readable underscore case name of the extension point. This is used in error messages.
|
||||
* @param singletons a list of singletons to bind with this extension point - these are bound in {@link #bind(Binder)}
|
||||
*/
|
||||
public InstanceMap(String name, Class<K> keyType, Class<V> valueType, Class<?>... singletons) {
|
||||
super(name, singletons);
|
||||
this.keyType = keyType;
|
||||
this.valueType = valueType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a mapping from {@code key} to {@code value}
|
||||
*
|
||||
* @throws IllegalArgumentException iff the key is already registered
|
||||
*/
|
||||
public void registerExtension(K key, V value) {
|
||||
V old = map.put(key, value);
|
||||
if (old != null) {
|
||||
throw new IllegalArgumentException("Cannot register [" + this.name + "] with key [" + key + "] to [" + value + "], already registered to [" + old + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void bindExtensions(Binder binder) {
|
||||
MapBinder<K, V> mapBinder = MapBinder.newMapBinder(binder, keyType, valueType);
|
||||
for (Map.Entry<K, V> entry : map.entrySet()) {
|
||||
mapBinder.addBinding(entry.getKey()).toInstance(entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,20 +19,6 @@
|
|||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.ExtensionPoint;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPingService;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -41,6 +27,16 @@ import java.util.Objects;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
* A module for loading classes for node discovery.
|
||||
*/
|
||||
|
@ -52,8 +48,7 @@ public class DiscoveryModule extends AbstractModule {
|
|||
new Setting<>("discovery.zen.hosts_provider", DISCOVERY_TYPE_SETTING, Function.identity(), Property.NodeScope);
|
||||
|
||||
private final Settings settings;
|
||||
private final Map<String, Supplier<UnicastHostsProvider>> unicastHostProviders;
|
||||
private final ExtensionPoint.ClassSet<ZenPing> zenPings = new ExtensionPoint.ClassSet<>("zen_ping", ZenPing.class);
|
||||
private final UnicastHostsProvider hostsProvider;
|
||||
private final Map<String, Class<? extends Discovery>> discoveryTypes = new HashMap<>();
|
||||
|
||||
public DiscoveryModule(Settings settings, TransportService transportService, NetworkService networkService,
|
||||
|
@ -62,16 +57,30 @@ public class DiscoveryModule extends AbstractModule {
|
|||
addDiscoveryType("none", NoneDiscovery.class);
|
||||
addDiscoveryType("zen", ZenDiscovery.class);
|
||||
|
||||
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
hostProviders.put("zen", () -> Collections::emptyList);
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
|
||||
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot specify zen hosts provider [" + entry.getKey() + "] twice");
|
||||
}
|
||||
});
|
||||
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
|
||||
if (discoveryType.equals("none") == false) {
|
||||
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
hostProviders.put("zen", () -> Collections::emptyList);
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
|
||||
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot specify zen hosts provider [" + entry.getKey() + "] twice");
|
||||
}
|
||||
});
|
||||
}
|
||||
String hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
|
||||
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName);
|
||||
if (hostsProviderSupplier == null) {
|
||||
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName + "]");
|
||||
}
|
||||
hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
|
||||
} else {
|
||||
hostsProvider = null;
|
||||
}
|
||||
unicastHostProviders = Collections.unmodifiableMap(hostProviders);
|
||||
}
|
||||
|
||||
public UnicastHostsProvider getHostsProvider() {
|
||||
return hostsProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -84,10 +93,6 @@ public class DiscoveryModule extends AbstractModule {
|
|||
discoveryTypes.put(type, clazz);
|
||||
}
|
||||
|
||||
public void addZenPing(Class<? extends ZenPing> clazz) {
|
||||
zenPings.registerExtension(clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
|
||||
|
@ -97,18 +102,7 @@ public class DiscoveryModule extends AbstractModule {
|
|||
}
|
||||
|
||||
if (discoveryType.equals("none") == false) {
|
||||
bind(ZenPingService.class).asEagerSingleton();
|
||||
String hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
|
||||
Supplier<UnicastHostsProvider> hostsProviderSupplier = unicastHostProviders.get(hostsProviderName);
|
||||
if (hostsProviderSupplier == null) {
|
||||
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName + "]");
|
||||
}
|
||||
UnicastHostsProvider hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
|
||||
bind(UnicastHostsProvider.class).toInstance(hostsProvider);
|
||||
if (zenPings.isEmpty()) {
|
||||
zenPings.registerExtension(UnicastZenPing.class);
|
||||
}
|
||||
zenPings.bind(binder());
|
||||
}
|
||||
bind(Discovery.class).to(discoveryClass).asEagerSingleton();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -36,7 +38,7 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
|
|||
* A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection},
|
||||
* making sure both use the same setting.
|
||||
*/
|
||||
public abstract class FaultDetection extends AbstractComponent {
|
||||
public abstract class FaultDetection extends AbstractComponent implements Closeable {
|
||||
|
||||
public static final Setting<Boolean> CONNECT_ON_NETWORK_DISCONNECT_SETTING =
|
||||
Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, Property.NodeScope);
|
||||
|
@ -80,6 +82,7 @@ public abstract class FaultDetection extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
transportService.removeConnectionListener(connectionListener);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,26 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
|
@ -30,8 +50,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
|
@ -56,34 +75,13 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||
import static org.elasticsearch.discovery.zen.ZenPing.PingResponse.readPingResponse;
|
||||
|
||||
public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPing {
|
||||
public class UnicastZenPing extends AbstractComponent implements ZenPing {
|
||||
|
||||
public static final String ACTION_NAME = "internal:discovery/zen/unicast";
|
||||
public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
|
||||
|
@ -125,15 +123,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
|
|||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
@Inject
|
||||
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
UnicastHostsProvider unicastHostsProviders) {
|
||||
UnicastHostsProvider unicastHostsProvider) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
|
||||
|
||||
this.hostsProvider = unicastHostsProviders;
|
||||
this.hostsProvider = unicastHostsProvider;
|
||||
|
||||
this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
|
||||
List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
|
||||
|
@ -190,26 +186,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
public void close() throws IOException {
|
||||
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
|
||||
try {
|
||||
IOUtils.close(receivedResponses.values());
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("Error wile closing send ping handlers", e);
|
||||
}
|
||||
IOUtils.close(receivedResponses.values());
|
||||
closed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPingContextProvider(PingContextProvider contextProvider) {
|
||||
public void start(PingContextProvider contextProvider) {
|
||||
this.contextProvider = contextProvider;
|
||||
}
|
||||
|
||||
|
@ -501,9 +485,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
|
|||
}
|
||||
|
||||
private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
|
||||
if (!lifecycle.started()) {
|
||||
throw new IllegalStateException("received ping request while not started");
|
||||
}
|
||||
temporalResponses.add(request.pingResponse);
|
||||
threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, new Runnable() {
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.discovery.zen;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
|
@ -67,6 +68,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -105,7 +107,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
private AllocationService allocationService;
|
||||
private final ClusterName clusterName;
|
||||
private final DiscoverySettings discoverySettings;
|
||||
private final ZenPingService pingService;
|
||||
private final ZenPing zenPing;
|
||||
private final MasterFaultDetection masterFD;
|
||||
private final NodesFaultDetection nodesFD;
|
||||
private final PublishClusterStateAction publishClusterState;
|
||||
|
@ -137,18 +139,16 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
|
||||
|
||||
@Inject
|
||||
public ZenDiscovery(Settings settings, ThreadPool threadPool,
|
||||
TransportService transportService, final ClusterService clusterService, ClusterSettings clusterSettings,
|
||||
ZenPingService pingService) {
|
||||
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, ClusterSettings clusterSettings, ZenPing zenPing) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.clusterName = clusterService.getClusterName();
|
||||
this.transportService = transportService;
|
||||
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
|
||||
this.pingService = pingService;
|
||||
this.zenPing = zenPing;
|
||||
this.electMaster = new ElectMasterService(settings);
|
||||
this.pingTimeout = PING_TIMEOUT_SETTING.get(settings);
|
||||
|
||||
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
|
||||
this.joinRetryAttempts = JOIN_RETRY_ATTEMPTS_SETTING.get(settings);
|
||||
this.joinRetryDelay = JOIN_RETRY_DELAY_SETTING.get(settings);
|
||||
|
@ -171,7 +171,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, clusterService);
|
||||
this.masterFD.addListener(new MasterNodeFailureListener());
|
||||
|
||||
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterService.getClusterName());
|
||||
this.nodesFD.addListener(new NodeFaultDetectionListener());
|
||||
|
||||
|
@ -183,9 +182,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
new NewPendingClusterStateListener(),
|
||||
discoverySettings,
|
||||
clusterService.getClusterName());
|
||||
this.pingService.setPingContextProvider(this);
|
||||
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
|
||||
|
||||
this.joinThreadControl = new JoinThreadControl(threadPool);
|
||||
|
||||
transportService.registerRequestHandler(
|
||||
|
@ -201,7 +198,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
protected void doStart() {
|
||||
nodesFD.setLocalNode(clusterService.localNode());
|
||||
joinThreadControl.start();
|
||||
pingService.start();
|
||||
zenPing.start(this);
|
||||
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
|
||||
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::rejoin, logger);
|
||||
}
|
||||
|
@ -233,7 +230,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
@Override
|
||||
protected void doStop() {
|
||||
joinThreadControl.stop();
|
||||
pingService.stop();
|
||||
masterFD.stop("zen disco stop");
|
||||
nodesFD.stop();
|
||||
DiscoveryNodes nodes = nodes();
|
||||
|
@ -264,10 +260,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
masterFD.close();
|
||||
nodesFD.close();
|
||||
pingService.close();
|
||||
protected void doClose() throws IOException {
|
||||
IOUtils.close(masterFD, nodesFD, zenPing);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -871,7 +865,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
|
||||
private DiscoveryNode findMaster() {
|
||||
logger.trace("starting to ping");
|
||||
List<ZenPing.PingResponse> fullPingResponses = pingService.pingAndWait(pingTimeout).toList();
|
||||
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
|
||||
if (fullPingResponses == null) {
|
||||
logger.trace("No full ping responses");
|
||||
return null;
|
||||
|
@ -1013,6 +1007,28 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
|
|||
}
|
||||
}
|
||||
|
||||
private ZenPing.PingCollection pingAndWait(TimeValue timeout) {
|
||||
final ZenPing.PingCollection response = new ZenPing.PingCollection();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
try {
|
||||
zenPing.ping(pings -> {
|
||||
response.addPings(pings);
|
||||
latch.countDown();
|
||||
}, timeout);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("Ping execution failed", ex);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
return response;
|
||||
} catch (InterruptedException e) {
|
||||
logger.trace("pingAndWait interrupted");
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,15 +19,7 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -36,11 +28,19 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
||||
public interface ZenPing extends LifecycleComponent {
|
||||
public interface ZenPing extends Closeable {
|
||||
|
||||
void setPingContextProvider(PingContextProvider contextProvider);
|
||||
void start(PingContextProvider contextProvider);
|
||||
|
||||
void ping(PingListener listener, TimeValue timeout);
|
||||
|
||||
|
|
|
@ -1,105 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class ZenPingService extends AbstractLifecycleComponent {
|
||||
|
||||
private List<ZenPing> zenPings = Collections.emptyList();
|
||||
|
||||
@Inject
|
||||
public ZenPingService(Settings settings, Set<ZenPing> zenPings) {
|
||||
super(settings);
|
||||
this.zenPings = Collections.unmodifiableList(new ArrayList<>(zenPings));
|
||||
}
|
||||
|
||||
public List<ZenPing> zenPings() {
|
||||
return this.zenPings;
|
||||
}
|
||||
|
||||
public void setPingContextProvider(PingContextProvider contextProvider) {
|
||||
if (lifecycle.started()) {
|
||||
throw new IllegalStateException("Can't set nodes provider when started");
|
||||
}
|
||||
for (ZenPing zenPing : zenPings) {
|
||||
zenPing.setPingContextProvider(contextProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
for (ZenPing zenPing : zenPings) {
|
||||
zenPing.start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
for (ZenPing zenPing : zenPings) {
|
||||
zenPing.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
for (ZenPing zenPing : zenPings) {
|
||||
zenPing.close();
|
||||
}
|
||||
}
|
||||
|
||||
public ZenPing.PingCollection pingAndWait(TimeValue timeout) {
|
||||
final ZenPing.PingCollection response = new ZenPing.PingCollection();
|
||||
final CountDownLatch latch = new CountDownLatch(zenPings.size());
|
||||
for (ZenPing zenPing : zenPings) {
|
||||
final AtomicBoolean counted = new AtomicBoolean();
|
||||
try {
|
||||
zenPing.ping(pings -> {
|
||||
response.addPings(pings);
|
||||
if (counted.compareAndSet(false, true)) {
|
||||
latch.countDown();
|
||||
}
|
||||
}, timeout);
|
||||
} catch (Exception ex) {
|
||||
logger.warn("Ping execution failed", ex);
|
||||
if (counted.compareAndSet(false, true)) {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
latch.await();
|
||||
return response;
|
||||
} catch (InterruptedException e) {
|
||||
logger.trace("pingAndWait interrupted");
|
||||
return response;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -76,6 +76,9 @@ import org.elasticsearch.common.util.BigArrays;
|
|||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
|
@ -319,7 +322,8 @@ public class Node implements Closeable {
|
|||
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
|
||||
clusterService.add(scriptModule.getScriptService());
|
||||
resourcesToClose.add(clusterService);
|
||||
final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(), classpathPlugins);
|
||||
final TribeService tribeService = new TribeService(settings, clusterService, nodeEnvironment.nodeId(),
|
||||
s -> newTribeClientNode(s, classpathPlugins));
|
||||
resourcesToClose.add(tribeService);
|
||||
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
|
||||
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
|
||||
|
@ -393,7 +397,10 @@ public class Node implements Closeable {
|
|||
b.bind(HttpServer.class).toProvider(Providers.of(null));
|
||||
};
|
||||
}
|
||||
modules.add(new DiscoveryModule(this.settings, transportService, networkService, pluginsService.filterPlugins(DiscoveryPlugin.class)));
|
||||
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, transportService, networkService,
|
||||
pluginsService.filterPlugins(DiscoveryPlugin.class));
|
||||
final ZenPing zenPing = newZenPing(settings, threadPool, transportService, discoveryModule.getHostsProvider());
|
||||
modules.add(discoveryModule);
|
||||
pluginsService.processModules(modules);
|
||||
modules.add(b -> {
|
||||
b.bind(IndicesQueriesRegistry.class).toInstance(searchModule.getQueryParserRegistry());
|
||||
|
@ -425,6 +432,7 @@ public class Node implements Closeable {
|
|||
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
|
||||
b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings,
|
||||
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings()));
|
||||
b.bind(ZenPing.class).toInstance(zenPing);
|
||||
{
|
||||
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
|
||||
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
|
||||
|
@ -881,4 +889,15 @@ public class Node implements Closeable {
|
|||
}
|
||||
return customNameResolvers;
|
||||
}
|
||||
|
||||
/** Create a new ZenPing instance for use in zen discovery. */
|
||||
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
UnicastHostsProvider hostsProvider) {
|
||||
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider);
|
||||
}
|
||||
|
||||
/** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
|
||||
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
return new Node(new Environment(settings), classpathPlugins);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.tribe;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* An internal node that connects to a remove cluster, as part of a tribe node.
|
||||
*/
|
||||
class TribeClientNode extends Node {
|
||||
TribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
super(new Environment(settings), classpathPlugins);
|
||||
}
|
||||
}
|
|
@ -185,7 +185,7 @@ public class TribeService extends AbstractLifecycleComponent {
|
|||
private final List<Node> nodes = new CopyOnWriteArrayList<>();
|
||||
|
||||
public TribeService(Settings settings, ClusterService clusterService, final String tribeNodeId,
|
||||
Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
Function<Settings, Node> clientNodeBuilder) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
Map<String, Settings> nodesSettings = new HashMap<>(settings.getGroups("tribe", true));
|
||||
|
@ -193,7 +193,7 @@ public class TribeService extends AbstractLifecycleComponent {
|
|||
nodesSettings.remove("on_conflict"); // remove prefix settings that don't indicate a client
|
||||
for (Map.Entry<String, Settings> entry : nodesSettings.entrySet()) {
|
||||
Settings clientSettings = buildClientSettings(entry.getKey(), tribeNodeId, settings, entry.getValue());
|
||||
nodes.add(new TribeClientNode(clientSettings, classpathPlugins));
|
||||
nodes.add(clientNodeBuilder.apply(clientSettings));
|
||||
}
|
||||
|
||||
this.blockIndicesMetadata = BLOCKS_METADATA_INDICES_SETTING.get(settings).toArray(Strings.EMPTY_ARRAY);
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.util;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.elasticsearch.common.inject.Binder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.ModulesBuilder;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
public class ExtensionPointTests extends ESTestCase {
|
||||
|
||||
public void testClassSet() {
|
||||
final ExtensionPoint.ClassSet<TestBaseClass> allocationDeciders = new ExtensionPoint.ClassSet<>("test_class", TestBaseClass.class, Consumer.class);
|
||||
allocationDeciders.registerExtension(TestImpl.class);
|
||||
Injector injector = new ModulesBuilder().add(new Module() {
|
||||
@Override
|
||||
public void configure(Binder binder) {
|
||||
allocationDeciders.bind(binder);
|
||||
}
|
||||
}).createInjector();
|
||||
assertEquals(1, TestImpl.instances.get());
|
||||
|
||||
}
|
||||
|
||||
public static class TestBaseClass {}
|
||||
|
||||
public static class Consumer {
|
||||
@Inject
|
||||
public Consumer(Set<TestBaseClass> deciders, TestImpl other) {
|
||||
// we require the TestImpl more than once to ensure it's bound as a singleton
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestImpl extends TestBaseClass {
|
||||
static final AtomicInteger instances = new AtomicInteger(0);
|
||||
|
||||
@Inject
|
||||
public TestImpl() {
|
||||
instances.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -76,8 +76,9 @@ public class DiscoveryModuleTests extends ModuleTestCase {
|
|||
|
||||
public void testUnknownHostsProvider() {
|
||||
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "dne").build();
|
||||
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.emptyList());
|
||||
assertBindingFailure(module, "Unknown zen hosts provider");
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||
new DiscoveryModule(settings, null, null, Collections.emptyList()));
|
||||
assertEquals("Unknown zen hosts provider [dne]", e.getMessage());
|
||||
}
|
||||
|
||||
public void testDuplicateHostsProvider() {
|
||||
|
|
|
@ -56,7 +56,6 @@ import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
|||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenPingService;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.indices.store.IndicesStoreIntegrationIT;
|
||||
import org.elasticsearch.monitor.jvm.HotThreads;
|
||||
|
@ -195,12 +194,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
ensureStableCluster(numberOfNodes);
|
||||
|
||||
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
|
||||
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
|
||||
for (ZenPing zenPing : pingService.zenPings()) {
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
}
|
||||
ZenPing zenPing = internalCluster().getInstance(ZenPing.class);
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
return nodes;
|
||||
}
|
||||
|
@ -858,10 +854,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
|
||||
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
||||
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
||||
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
|
||||
for (ZenPing zenPing : pingService.zenPings()) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
ZenPing zenPing = internalCluster().getInstance(ZenPing.class);
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
|
||||
// Simulate a network issue between the unlucky node and elected master node in both directions.
|
||||
|
@ -896,10 +891,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
|
|||
|
||||
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
|
||||
// includes all the other nodes that have pinged it and the issue doesn't manifest
|
||||
for (ZenPingService pingService : internalCluster().getInstances(ZenPingService.class)) {
|
||||
for (ZenPing zenPing : pingService.zenPings()) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
ZenPing zenPing = internalCluster().getInstance(ZenPing.class);
|
||||
if (zenPing instanceof UnicastZenPing) {
|
||||
((UnicastZenPing) zenPing).clearTemporalResponses();
|
||||
}
|
||||
|
||||
// Simulate a network issue between the unicast target node and the rest of the cluster
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.discovery.zen;
|
||||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -33,10 +34,6 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.PingContextProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
@ -47,6 +44,7 @@ import org.elasticsearch.transport.TransportConnectionListener;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -62,7 +60,7 @@ import static org.hamcrest.Matchers.greaterThan;
|
|||
public class UnicastZenPingTests extends ESTestCase {
|
||||
private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList;
|
||||
|
||||
public void testSimplePings() throws InterruptedException {
|
||||
public void testSimplePings() throws IOException, InterruptedException {
|
||||
int startPort = 11000 + randomIntBetween(0, 1000);
|
||||
int endPort = startPort + 10;
|
||||
Settings settings = Settings.builder()
|
||||
|
@ -97,7 +95,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
|
||||
Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build();
|
||||
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingA.setPingContextProvider(new PingContextProvider() {
|
||||
zenPingA.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleA.node).localNodeId("UZP_A").build();
|
||||
|
@ -108,10 +106,9 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
return ClusterState.builder(state).blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)).build();
|
||||
}
|
||||
});
|
||||
zenPingA.start();
|
||||
|
||||
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingB.setPingContextProvider(new PingContextProvider() {
|
||||
zenPingB.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleB.node).localNodeId("UZP_B").build();
|
||||
|
@ -122,7 +119,6 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
return state;
|
||||
}
|
||||
});
|
||||
zenPingB.start();
|
||||
|
||||
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) {
|
||||
@Override
|
||||
|
@ -130,7 +126,7 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
return versionD;
|
||||
}
|
||||
};
|
||||
zenPingC.setPingContextProvider(new PingContextProvider() {
|
||||
zenPingC.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleC.node).localNodeId("UZP_C").build();
|
||||
|
@ -141,10 +137,9 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
return state;
|
||||
}
|
||||
});
|
||||
zenPingC.start();
|
||||
|
||||
UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER);
|
||||
zenPingD.setPingContextProvider(new PingContextProvider() {
|
||||
zenPingD.start(new PingContextProvider() {
|
||||
@Override
|
||||
public DiscoveryNodes nodes() {
|
||||
return DiscoveryNodes.builder().add(handleD.node).localNodeId("UZP_D").build();
|
||||
|
@ -155,7 +150,6 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
return state;
|
||||
}
|
||||
});
|
||||
zenPingD.start();
|
||||
|
||||
try {
|
||||
logger.info("ping from UZP_A");
|
||||
|
@ -185,15 +179,12 @@ public class UnicastZenPingTests extends ESTestCase {
|
|||
assertThat(pingResponses.size(), equalTo(0));
|
||||
assertCounters(handleD, handleA, handleB, handleC, handleD);
|
||||
} finally {
|
||||
zenPingA.close();
|
||||
zenPingB.close();
|
||||
zenPingC.close();
|
||||
zenPingD.close();
|
||||
handleA.transportService.close();
|
||||
handleB.transportService.close();
|
||||
handleC.transportService.close();
|
||||
handleD.transportService.close();
|
||||
terminate(threadPool);
|
||||
try {
|
||||
IOUtils.close(zenPingA, zenPingB, zenPingC, zenPingD,
|
||||
handleA.transportService, handleB.transportService, handleC.transportService, handleD.transportService);
|
||||
} finally {
|
||||
terminate(threadPool);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.discovery.MockZenPing;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -269,8 +270,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
|
|||
|
||||
private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) {
|
||||
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet());
|
||||
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, clusterSettings, zenPingService);
|
||||
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, clusterSettings, new MockZenPing(settings));
|
||||
zenDiscovery.start();
|
||||
return zenDiscovery;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,18 @@
|
|||
|
||||
package org.elasticsearch.tribe;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -46,18 +58,6 @@ import org.junit.After;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
|
|
@ -89,5 +89,5 @@ running on the default port:
|
|||
----------------------------------------------------------------
|
||||
|
||||
Host names are allowed instead of IP addresses (similar to
|
||||
`discovery.zen.ping.unicast.hosts`), and IPv6 addresses must be
|
||||
`discovery.zen.ping.unicast.hosts`), and IPv6 addresses must be
|
||||
specified in brackets with the port coming after the brackets.
|
||||
|
|
|
@ -69,14 +69,14 @@ public class TribeUnitTests extends ESTestCase {
|
|||
.build();
|
||||
|
||||
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class);
|
||||
tribe1 = new TribeClientNode(
|
||||
tribe1 = new MockNode(
|
||||
Settings.builder()
|
||||
.put(baseSettings)
|
||||
.put("cluster.name", "tribe1")
|
||||
.put("node.name", "tribe1_node")
|
||||
.put(NodeEnvironment.NODE_ID_SEED_SETTING.getKey(), random().nextLong())
|
||||
.build(), mockPlugins).start();
|
||||
tribe2 = new TribeClientNode(
|
||||
tribe2 = new MockNode(
|
||||
Settings.builder()
|
||||
.put(baseSettings)
|
||||
.put("cluster.name", "tribe2")
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
|
@ -33,6 +35,7 @@ import org.elasticsearch.script.ScriptService;
|
|||
import org.elasticsearch.search.MockSearchService;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.search.fetch.FetchPhase;
|
||||
import org.elasticsearch.test.discovery.MockZenPing;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
@ -96,6 +99,21 @@ public class MockNode extends Node {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
UnicastHostsProvider hostsProvider) {
|
||||
if (getPluginsService().filterPlugins(MockZenPing.TestPlugin.class).isEmpty()) {
|
||||
return super.newZenPing(settings, threadPool, transportService, hostsProvider);
|
||||
} else {
|
||||
return new MockZenPing(settings);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
|
||||
return new MockNode(settings, classpathPlugins);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
|
||||
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.test.discovery;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -40,7 +41,10 @@ import java.util.stream.Collectors;
|
|||
* A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging
|
||||
* to be immediate and can be used to speed up tests.
|
||||
*/
|
||||
public final class MockZenPing extends AbstractLifecycleComponent implements ZenPing {
|
||||
public final class MockZenPing extends AbstractComponent implements ZenPing {
|
||||
|
||||
/** A marker plugin used by {@link org.elasticsearch.node.MockNode} to indicate this mock zen ping should be used. */
|
||||
public static class TestPlugin extends Plugin {}
|
||||
|
||||
static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
|
@ -52,8 +56,11 @@ public final class MockZenPing extends AbstractLifecycleComponent implements Zen
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setPingContextProvider(PingContextProvider contextProvider) {
|
||||
public void start(PingContextProvider contextProvider) {
|
||||
this.contextProvider = contextProvider;
|
||||
assert contextProvider != null;
|
||||
boolean added = getActiveNodesForCurrentCluster().add(this);
|
||||
assert added;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,33 +82,14 @@ public final class MockZenPing extends AbstractLifecycleComponent implements Zen
|
|||
return new PingResponse(clusterState.nodes().getLocalNode(), clusterState.nodes().getMasterNode(), clusterState);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
assert contextProvider != null;
|
||||
boolean added = getActiveNodesForCurrentCluster().add(this);
|
||||
assert added;
|
||||
}
|
||||
|
||||
private Set<MockZenPing> getActiveNodesForCurrentCluster() {
|
||||
return activeNodesPerCluster.computeIfAbsent(getClusterName(),
|
||||
clusterName -> ConcurrentCollections.newConcurrentSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
public void close() {
|
||||
boolean found = getActiveNodesForCurrentCluster().remove(this);
|
||||
assert found;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() {
|
||||
|
||||
}
|
||||
|
||||
public static class TestPlugin extends Plugin implements DiscoveryPlugin {
|
||||
|
||||
public void onModule(DiscoveryModule discoveryModule) {
|
||||
discoveryModule.addZenPing(MockZenPing.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue