mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 17:38:44 +00:00
Plugins: Convert custom discovery to pull based plugin (#21398)
* Plugins: Convert custom discovery to pull based plugin This change primarily moves registering custom Discovery implementations to the pull based DiscoveryPlugin interface. It also keeps the cloud based discovery plugins re-registering ZenDiscovery under their own name in order to maintain backwards compatibility. However, discovery.zen.hosts_provider is changed here to no longer fallback to discovery.type. Instead, each plugin which previously relied on the value of discovery.type now sets the hosts_provider to itself if discovery.type is set to itself, along with a deprecation warning.
This commit is contained in:
parent
aec09a76d6
commit
4f5a934d92
@ -24,86 +24,82 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
* A module for loading classes for node discovery.
|
||||
*/
|
||||
public class DiscoveryModule extends AbstractModule {
|
||||
public class DiscoveryModule {
|
||||
|
||||
public static final Setting<String> DISCOVERY_TYPE_SETTING =
|
||||
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
|
||||
public static final Setting<String> DISCOVERY_HOSTS_PROVIDER_SETTING =
|
||||
new Setting<>("discovery.zen.hosts_provider", DISCOVERY_TYPE_SETTING, Function.identity(), Property.NodeScope);
|
||||
public static final Setting<Optional<String>> DISCOVERY_HOSTS_PROVIDER_SETTING =
|
||||
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
|
||||
|
||||
private final Settings settings;
|
||||
private final UnicastHostsProvider hostsProvider;
|
||||
private final Map<String, Class<? extends Discovery>> discoveryTypes = new HashMap<>();
|
||||
private final Discovery discovery;
|
||||
|
||||
public DiscoveryModule(Settings settings, TransportService transportService, NetworkService networkService,
|
||||
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService, NetworkService networkService,
|
||||
ClusterService clusterService, Function<UnicastHostsProvider, ZenPing> createZenPing,
|
||||
List<DiscoveryPlugin> plugins) {
|
||||
this.settings = settings;
|
||||
addDiscoveryType("none", NoneDiscovery.class);
|
||||
addDiscoveryType("zen", ZenDiscovery.class);
|
||||
final UnicastHostsProvider hostsProvider;
|
||||
|
||||
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
|
||||
if (discoveryType.equals("none") == false) {
|
||||
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
hostProviders.put("zen", () -> Collections::emptyList);
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
|
||||
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot specify zen hosts provider [" + entry.getKey() + "] twice");
|
||||
}
|
||||
});
|
||||
}
|
||||
String hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
|
||||
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName);
|
||||
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
|
||||
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice");
|
||||
}
|
||||
});
|
||||
}
|
||||
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
|
||||
if (hostsProviderName.isPresent()) {
|
||||
Supplier<UnicastHostsProvider> hostsProviderSupplier = hostProviders.get(hostsProviderName.get());
|
||||
if (hostsProviderSupplier == null) {
|
||||
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName + "]");
|
||||
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName.get() + "]");
|
||||
}
|
||||
hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
|
||||
} else {
|
||||
hostsProvider = null;
|
||||
hostsProvider = Collections::emptyList;
|
||||
}
|
||||
}
|
||||
|
||||
public UnicastHostsProvider getHostsProvider() {
|
||||
return hostsProvider;
|
||||
}
|
||||
final ZenPing zenPing = createZenPing.apply(hostsProvider);
|
||||
|
||||
/**
|
||||
* Adds a custom Discovery type.
|
||||
*/
|
||||
public void addDiscoveryType(String type, Class<? extends Discovery> clazz) {
|
||||
if (discoveryTypes.containsKey(type)) {
|
||||
throw new IllegalArgumentException("discovery type [" + type + "] is already registered");
|
||||
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
|
||||
discoveryTypes.put("zen",
|
||||
() -> new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
|
||||
discoveryTypes.put("none", () -> new NoneDiscovery(settings, clusterService, clusterService.getClusterSettings()));
|
||||
for (DiscoveryPlugin plugin : plugins) {
|
||||
plugin.getDiscoveryTypes(threadPool, transportService, clusterService, zenPing).entrySet().forEach(entry -> {
|
||||
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
|
||||
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
|
||||
}
|
||||
});
|
||||
}
|
||||
discoveryTypes.put(type, clazz);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
String discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
|
||||
Class<? extends Discovery> discoveryClass = discoveryTypes.get(discoveryType);
|
||||
if (discoveryClass == null) {
|
||||
throw new IllegalArgumentException("Unknown Discovery type [" + discoveryType + "]");
|
||||
Supplier<Discovery> discoverySupplier = discoveryTypes.get(discoveryType);
|
||||
if (discoverySupplier == null) {
|
||||
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
|
||||
}
|
||||
discovery = Objects.requireNonNull(discoverySupplier.get());
|
||||
}
|
||||
|
||||
if (discoveryType.equals("none") == false) {
|
||||
bind(UnicastHostsProvider.class).toInstance(hostsProvider);
|
||||
}
|
||||
bind(Discovery.class).to(discoveryClass).asEagerSingleton();
|
||||
public Discovery getDiscovery() {
|
||||
return discovery;
|
||||
}
|
||||
}
|
||||
|
@ -76,8 +76,10 @@ import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.discovery.NoneDiscovery;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastZenPing;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
@ -153,16 +155,22 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
|
||||
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
|
||||
|
||||
/**
|
||||
* A node represent a node within a cluster (<tt>cluster.name</tt>). The {@link #client()} can be used
|
||||
* in order to use a {@link Client} to perform actions/operations against the cluster.
|
||||
@ -397,10 +405,10 @@ public class Node implements Closeable {
|
||||
b.bind(HttpServer.class).toProvider(Providers.of(null));
|
||||
};
|
||||
}
|
||||
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, transportService, networkService,
|
||||
|
||||
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
|
||||
networkService, clusterService, hostsProvider -> newZenPing(settings, threadPool, transportService, hostsProvider),
|
||||
pluginsService.filterPlugins(DiscoveryPlugin.class));
|
||||
final ZenPing zenPing = newZenPing(settings, threadPool, transportService, discoveryModule.getHostsProvider());
|
||||
modules.add(discoveryModule);
|
||||
pluginsService.processModules(modules);
|
||||
modules.add(b -> {
|
||||
b.bind(IndicesQueriesRegistry.class).toInstance(searchModule.getQueryParserRegistry());
|
||||
@ -432,7 +440,7 @@ public class Node implements Closeable {
|
||||
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
|
||||
b.bind(MetaDataIndexUpgradeService.class).toInstance(new MetaDataIndexUpgradeService(settings,
|
||||
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings()));
|
||||
b.bind(ZenPing.class).toInstance(zenPing);
|
||||
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
|
||||
{
|
||||
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
|
||||
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
|
||||
|
@ -23,9 +23,13 @@ import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
/**
|
||||
@ -42,6 +46,24 @@ import org.elasticsearch.transport.TransportService;
|
||||
* }</pre>
|
||||
*/
|
||||
public interface DiscoveryPlugin {
|
||||
|
||||
/**
|
||||
* Returns custom discovery implementations added by this plugin.
|
||||
*
|
||||
* The key of the returned map is the name of the discovery implementation
|
||||
* (see {@link org.elasticsearch.discovery.DiscoveryModule#DISCOVERY_TYPE_SETTING}, and
|
||||
* the value is a supplier to construct the {@link Discovery}.
|
||||
*
|
||||
* @param threadPool Use to schedule ping actions
|
||||
* @param transportService Use to communicate with other nodes
|
||||
* @param clusterService Use to find current nodes in the cluster
|
||||
* @param zenPing Use to ping other nodes with zen unicast host list
|
||||
*/
|
||||
default Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, ZenPing zenPing) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to add additional {@link NetworkService.CustomNameResolver}s.
|
||||
* This can be handy if you want to provide your own Network interface name like _mycard_
|
||||
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.indices.analysis.AnalysisModule;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
@ -206,7 +207,7 @@ public abstract class Plugin {
|
||||
public final void onModule(ActionModule module) {}
|
||||
|
||||
/**
|
||||
* Old-style action extension point. {@code @Deprecated} and {@code final} to act as a signpost for plugin authors upgrading
|
||||
* Old-style search extension point. {@code @Deprecated} and {@code final} to act as a signpost for plugin authors upgrading
|
||||
* from 2.x.
|
||||
*
|
||||
* @deprecated implement {@link SearchPlugin} instead
|
||||
@ -215,11 +216,20 @@ public abstract class Plugin {
|
||||
public final void onModule(SearchModule module) {}
|
||||
|
||||
/**
|
||||
* Old-style action extension point. {@code @Deprecated} and {@code final} to act as a signpost for plugin authors upgrading
|
||||
* Old-style network extension point. {@code @Deprecated} and {@code final} to act as a signpost for plugin authors upgrading
|
||||
* from 2.x.
|
||||
*
|
||||
* @deprecated implement {@link NetworkPlugin} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public final void onModule(NetworkModule module) {}
|
||||
|
||||
/**
|
||||
* Old-style discovery extension point. {@code @Deprecated} and {@code final} to act as a signpost for plugin authors upgrading
|
||||
* from 2.x.
|
||||
*
|
||||
* @deprecated implement {@link DiscoveryPlugin} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public final void onModule(DiscoveryModule module) {}
|
||||
}
|
||||
|
@ -18,23 +18,38 @@
|
||||
*/
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.elasticsearch.common.inject.ModuleTestCase;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.NoopDiscovery;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
public class DiscoveryModuleTests extends ModuleTestCase {
|
||||
public class DiscoveryModuleTests extends ESTestCase {
|
||||
|
||||
public interface DummyDiscoPlugin extends DiscoveryPlugin {
|
||||
private TransportService transportService;
|
||||
private ClusterService clusterService;
|
||||
|
||||
public interface DummyHostsProviderPlugin extends DiscoveryPlugin {
|
||||
Map<String, Supplier<UnicastHostsProvider>> impl();
|
||||
@Override
|
||||
default Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
|
||||
@ -43,49 +58,95 @@ public class DiscoveryModuleTests extends ModuleTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testRegisterDefaults() {
|
||||
Settings settings = Settings.EMPTY;
|
||||
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.emptyList());
|
||||
assertBinding(module, Discovery.class, ZenDiscovery.class);
|
||||
public interface DummyDiscoveryPlugin extends DiscoveryPlugin {
|
||||
Map<String, Supplier<Discovery>> impl();
|
||||
@Override
|
||||
default Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, ZenPing zenPing) {
|
||||
return impl();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupDummyServices() {
|
||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, null, null);
|
||||
clusterService = new ClusterService(Settings.EMPTY, clusterSettings, null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void clearDummyServices() throws IOException {
|
||||
IOUtils.close(transportService, clusterService);
|
||||
transportService = null;
|
||||
clusterService = null;
|
||||
}
|
||||
|
||||
private DiscoveryModule newModule(Settings settings, Function<UnicastHostsProvider, ZenPing> createZenPing,
|
||||
List<DiscoveryPlugin> plugins) {
|
||||
return new DiscoveryModule(settings, null, transportService, null, clusterService, createZenPing, plugins);
|
||||
}
|
||||
|
||||
public void testDefaults() {
|
||||
DiscoveryModule module = newModule(Settings.EMPTY, hostsProvider -> null, Collections.emptyList());
|
||||
assertTrue(module.getDiscovery() instanceof ZenDiscovery);
|
||||
}
|
||||
|
||||
public void testLazyConstructionDiscovery() {
|
||||
DummyDiscoveryPlugin plugin = () -> Collections.singletonMap("custom",
|
||||
() -> { throw new AssertionError("created discovery type which was not selected"); });
|
||||
newModule(Settings.EMPTY, hostsProvider -> null, Collections.singletonList(plugin));
|
||||
}
|
||||
|
||||
public void testRegisterDiscovery() {
|
||||
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "custom").build();
|
||||
DummyDiscoPlugin plugin = () -> Collections.singletonMap("custom", () -> Collections::emptyList);
|
||||
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.singletonList(plugin));
|
||||
module.addDiscoveryType("custom", NoopDiscovery.class);
|
||||
assertBinding(module, Discovery.class, NoopDiscovery.class);
|
||||
DummyDiscoveryPlugin plugin = () -> Collections.singletonMap("custom", NoopDiscovery::new);
|
||||
DiscoveryModule module = newModule(settings, hostsProvider -> null, Collections.singletonList(plugin));
|
||||
assertTrue(module.getDiscovery() instanceof NoopDiscovery);
|
||||
}
|
||||
|
||||
public void testUnknownDiscovery() {
|
||||
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "dne").build();
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||
newModule(settings, hostsProvider -> null, Collections.emptyList()));
|
||||
assertEquals("Unknown discovery type [dne]", e.getMessage());
|
||||
}
|
||||
|
||||
public void testDuplicateDiscovery() {
|
||||
DummyDiscoveryPlugin plugin1 = () -> Collections.singletonMap("dup", () -> null);
|
||||
DummyDiscoveryPlugin plugin2 = () -> Collections.singletonMap("dup", () -> null);
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||
newModule(Settings.EMPTY, hostsProvider -> null, Arrays.asList(plugin1, plugin2)));
|
||||
assertEquals("Cannot register discovery type [dup] twice", e.getMessage());
|
||||
}
|
||||
|
||||
public void testHostsProvider() {
|
||||
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "custom").build();
|
||||
final UnicastHostsProvider provider = Collections::emptyList;
|
||||
DummyDiscoPlugin plugin = () -> Collections.singletonMap("custom", () -> provider);
|
||||
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.singletonList(plugin));
|
||||
assertInstanceBinding(module, UnicastHostsProvider.class, instance -> instance == provider);
|
||||
}
|
||||
|
||||
public void testHostsProviderBwc() {
|
||||
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "custom").build();
|
||||
final UnicastHostsProvider provider = Collections::emptyList;
|
||||
DummyDiscoPlugin plugin = () -> Collections.singletonMap("custom", () -> provider);
|
||||
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.singletonList(plugin));
|
||||
module.addDiscoveryType("custom", NoopDiscovery.class);
|
||||
assertInstanceBinding(module, UnicastHostsProvider.class, instance -> instance == provider);
|
||||
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom", () -> provider);
|
||||
newModule(settings, hostsProvider -> {
|
||||
assertEquals(provider, hostsProvider);
|
||||
return null;
|
||||
}, Collections.singletonList(plugin));
|
||||
}
|
||||
|
||||
public void testUnknownHostsProvider() {
|
||||
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "dne").build();
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||
new DiscoveryModule(settings, null, null, Collections.emptyList()));
|
||||
newModule(settings, hostsProvider -> null, Collections.emptyList()));
|
||||
assertEquals("Unknown zen hosts provider [dne]", e.getMessage());
|
||||
}
|
||||
|
||||
public void testDuplicateHostsProvider() {
|
||||
DummyDiscoPlugin plugin1 = () -> Collections.singletonMap("dup", () -> null);
|
||||
DummyDiscoPlugin plugin2 = () -> Collections.singletonMap("dup", () -> null);
|
||||
DummyHostsProviderPlugin plugin1 = () -> Collections.singletonMap("dup", () -> null);
|
||||
DummyHostsProviderPlugin plugin2 = () -> Collections.singletonMap("dup", () -> null);
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
|
||||
new DiscoveryModule(Settings.EMPTY, null, null, Arrays.asList(plugin1, plugin2)));
|
||||
assertEquals("Cannot specify zen hosts provider [dup] twice", e.getMessage());
|
||||
newModule(Settings.EMPTY, hostsProvider -> null, Arrays.asList(plugin1, plugin2)));
|
||||
assertEquals("Cannot register zen hosts provider [dup] twice", e.getMessage());
|
||||
}
|
||||
|
||||
public void testLazyConstructionHostsProvider() {
|
||||
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom",
|
||||
() -> { throw new AssertionError("created hosts provider which was not selected"); });
|
||||
newModule(Settings.EMPTY, hostsProvider -> null, Collections.singletonList(plugin));
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,9 @@ import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDeta
|
||||
import com.microsoft.windowsazure.management.configuration.ManagementConfiguration;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
public class AzureComputeServiceImpl extends AbstractLifecycleComponent
|
||||
@ -43,11 +45,11 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent
|
||||
|
||||
public AzureComputeServiceImpl(Settings settings) {
|
||||
super(settings);
|
||||
String subscriptionId = Management.SUBSCRIPTION_ID_SETTING.get(settings);
|
||||
String subscriptionId = getRequiredSetting(settings, Management.SUBSCRIPTION_ID_SETTING);
|
||||
|
||||
serviceName = Management.SERVICE_NAME_SETTING.get(settings);
|
||||
String keystorePath = Management.KEYSTORE_PATH_SETTING.get(settings);
|
||||
String keystorePassword = Management.KEYSTORE_PASSWORD_SETTING.get(settings);
|
||||
serviceName = getRequiredSetting(settings, Management.SERVICE_NAME_SETTING);
|
||||
String keystorePath = getRequiredSetting(settings, Management.KEYSTORE_PATH_SETTING);
|
||||
String keystorePassword = getRequiredSetting(settings, Management.KEYSTORE_PASSWORD_SETTING);
|
||||
KeyStoreType keystoreType = Management.KEYSTORE_TYPE_SETTING.get(settings);
|
||||
|
||||
logger.trace("creating new Azure client for [{}], [{}]", subscriptionId, serviceName);
|
||||
@ -77,6 +79,14 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent
|
||||
}
|
||||
}
|
||||
|
||||
private static String getRequiredSetting(Settings settings, Setting<String> setting) {
|
||||
String value = setting.get(settings);
|
||||
if (value == null || Strings.hasLength(value) == false) {
|
||||
throw new IllegalArgumentException("Missing required setting " + setting.getKey() + " for azure");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HostedServiceGetDetailedResponse getServiceDetails() {
|
||||
try {
|
||||
|
@ -29,29 +29,33 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
|
||||
import org.elasticsearch.cloud.azure.classic.management.AzureComputeServiceImpl;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.azure.classic.AzureUnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
public class AzureDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
|
||||
|
||||
public static final String AZURE = "azure";
|
||||
protected final Settings settings;
|
||||
protected final Logger logger = Loggers.getLogger(AzureDiscoveryPlugin.class);
|
||||
private static final Logger logger = Loggers.getLogger(AzureDiscoveryPlugin.class);
|
||||
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||
|
||||
public AzureDiscoveryPlugin(Settings settings) {
|
||||
this.settings = settings;
|
||||
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||
deprecationLogger.deprecated("azure classic discovery plugin is deprecated. Use azure arm discovery plugin instead");
|
||||
logger.trace("starting azure classic discovery plugin...");
|
||||
}
|
||||
@ -68,10 +72,12 @@ public class AzureDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
|
||||
() -> new AzureUnicastHostsProvider(settings, createComputeService(), transportService, networkService));
|
||||
}
|
||||
|
||||
public void onModule(DiscoveryModule discoveryModule) {
|
||||
if (isDiscoveryReady(settings, logger)) {
|
||||
discoveryModule.addDiscoveryType(AZURE, ZenDiscovery.class);
|
||||
}
|
||||
@Override
|
||||
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, ZenPing zenPing) {
|
||||
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
|
||||
return Collections.singletonMap(AZURE, () ->
|
||||
new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -88,36 +94,19 @@ public class AzureDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
|
||||
AzureComputeService.Discovery.ENDPOINT_NAME_SETTING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if discovery is meant to start
|
||||
* @return true if we can start discovery features
|
||||
*/
|
||||
private static boolean isDiscoveryReady(Settings settings, Logger logger) {
|
||||
// User set discovery.type: azure
|
||||
if (!AzureDiscoveryPlugin.AZURE.equalsIgnoreCase(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) {
|
||||
logger.trace("discovery.type not set to {}", AzureDiscoveryPlugin.AZURE);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isDefined(settings, AzureComputeService.Management.SUBSCRIPTION_ID_SETTING) &&
|
||||
isDefined(settings, AzureComputeService.Management.SERVICE_NAME_SETTING) &&
|
||||
isDefined(settings, AzureComputeService.Management.KEYSTORE_PATH_SETTING) &&
|
||||
isDefined(settings, AzureComputeService.Management.KEYSTORE_PASSWORD_SETTING)) {
|
||||
logger.trace("All required properties for Azure discovery are set!");
|
||||
return true;
|
||||
} else {
|
||||
logger.debug("One or more Azure discovery settings are missing. " +
|
||||
"Check elasticsearch.yml file. Should have [{}], [{}], [{}] and [{}].",
|
||||
AzureComputeService.Management.SUBSCRIPTION_ID_SETTING.getKey(),
|
||||
AzureComputeService.Management.SERVICE_NAME_SETTING.getKey(),
|
||||
AzureComputeService.Management.KEYSTORE_PATH_SETTING.getKey(),
|
||||
AzureComputeService.Management.KEYSTORE_PASSWORD_SETTING.getKey());
|
||||
return false;
|
||||
@Override
|
||||
public Settings additionalSettings() {
|
||||
// For 5.0, the hosts provider was "zen", but this was before the discovery.zen.hosts_provider
|
||||
// setting existed. This check looks for the legacy setting, and sets hosts provider if set
|
||||
String discoveryType = DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings);
|
||||
if (discoveryType.equals(AZURE)) {
|
||||
deprecationLogger.deprecated("Using " + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() +
|
||||
" setting to set hosts provider is deprecated. " +
|
||||
"Set \"" + DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey() + ": " + AZURE + "\" instead");
|
||||
if (DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.exists(settings) == false) {
|
||||
return Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), AZURE).build();
|
||||
}
|
||||
}
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
private static boolean isDefined(Settings settings, Setting<String> property) throws ElasticsearchException {
|
||||
return (property.exists(settings) && Strings.hasText(property.get(settings)));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -44,22 +44,28 @@ import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2Service;
|
||||
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
|
||||
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Closeable {
|
||||
|
||||
private static Logger logger = Loggers.getLogger(Ec2DiscoveryPlugin.class);
|
||||
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||
|
||||
public static final String EC2 = "ec2";
|
||||
|
||||
@ -93,8 +99,12 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||
this.settings = settings;
|
||||
}
|
||||
|
||||
public void onModule(DiscoveryModule discoveryModule) {
|
||||
discoveryModule.addDiscoveryType(EC2, ZenDiscovery.class);
|
||||
@Override
|
||||
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, ZenPing zenPing) {
|
||||
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
|
||||
return Collections.singletonMap(EC2, () ->
|
||||
new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -147,10 +157,25 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||
AwsEc2Service.AUTO_ATTRIBUTE_SETTING);
|
||||
}
|
||||
|
||||
/** Adds a node attribute for the ec2 availability zone. */
|
||||
@Override
|
||||
public Settings additionalSettings() {
|
||||
return getAvailabilityZoneNodeAttributes(settings, AwsEc2ServiceImpl.EC2_METADATA_URL + "placement/availability-zone");
|
||||
Settings.Builder builder = Settings.builder();
|
||||
// For 5.0, discovery.type was used prior to the new discovery.zen.hosts_provider
|
||||
// setting existed. This check looks for the legacy setting, and sets hosts provider if set
|
||||
String discoveryType = DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings);
|
||||
if (discoveryType.equals(EC2)) {
|
||||
deprecationLogger.deprecated("Using " + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() +
|
||||
" setting to set hosts provider is deprecated. " +
|
||||
"Set \"" + DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey() + ": " + EC2 + "\" instead");
|
||||
if (DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.exists(settings) == false) {
|
||||
builder.put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), EC2).build();
|
||||
}
|
||||
}
|
||||
|
||||
// Adds a node attribute for the ec2 availability zone
|
||||
String azMetadataUrl = AwsEc2ServiceImpl.EC2_METADATA_URL + "placement/availability-zone";
|
||||
builder.put(getAvailabilityZoneNodeAttributes(settings, azMetadataUrl));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
// pkg private for testing
|
||||
|
@ -62,13 +62,13 @@ public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin
|
||||
// For 5.0, the hosts provider was "zen", but this was before the discovery.zen.hosts_provider
|
||||
// setting existed. This check looks for the legacy zen, and sets the file hosts provider if not set
|
||||
String discoveryType = DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings);
|
||||
// look at hosts provider setting to avoid fallback as default
|
||||
String hostsProvider = settings.get(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey());
|
||||
if (hostsProvider == null && discoveryType.equals("zen")) {
|
||||
if (discoveryType.equals("zen")) {
|
||||
deprecationLogger.deprecated("Using " + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() +
|
||||
" setting to set hosts provider is deprecated. " +
|
||||
"Set \"" + DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey() + ": file\" instead");
|
||||
return Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file").build();
|
||||
if (DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.exists(settings) == false) {
|
||||
return Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file").build();
|
||||
}
|
||||
}
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
@ -30,18 +30,23 @@ import org.elasticsearch.cloud.gce.GceInstancesServiceImpl;
|
||||
import org.elasticsearch.cloud.gce.GceMetadataService;
|
||||
import org.elasticsearch.cloud.gce.GceModule;
|
||||
import org.elasticsearch.cloud.gce.network.GceNameResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.gce.GceUnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.discovery.zen.ZenPing;
|
||||
import org.elasticsearch.plugins.DiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.Closeable;
|
||||
@ -60,7 +65,8 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||
|
||||
public static final String GCE = "gce";
|
||||
private final Settings settings;
|
||||
protected final Logger logger = Loggers.getLogger(GceDiscoveryPlugin.class);
|
||||
private static final Logger logger = Loggers.getLogger(GceDiscoveryPlugin.class);
|
||||
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||
// stashed when created in order to properly close
|
||||
private final SetOnce<GceInstancesServiceImpl> gceInstancesService = new SetOnce<>();
|
||||
|
||||
@ -91,9 +97,12 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||
logger.trace("starting gce discovery plugin...");
|
||||
}
|
||||
|
||||
public void onModule(DiscoveryModule discoveryModule) {
|
||||
logger.debug("Register gce discovery type and gce unicast provider");
|
||||
discoveryModule.addDiscoveryType(GCE, ZenDiscovery.class);
|
||||
@Override
|
||||
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, ZenPing zenPing) {
|
||||
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
|
||||
return Collections.singletonMap(GCE, () ->
|
||||
new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -123,6 +132,22 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||
GceInstancesService.MAX_WAIT_SETTING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings additionalSettings() {
|
||||
// For 5.0, the hosts provider was "zen", but this was before the discovery.zen.hosts_provider
|
||||
// setting existed. This check looks for the legacy setting, and sets hosts provider if set
|
||||
String discoveryType = DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings);
|
||||
if (discoveryType.equals(GCE)) {
|
||||
deprecationLogger.deprecated("Using " + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() +
|
||||
" setting to set hosts provider is deprecated. " +
|
||||
"Set \"" + DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey() + ": " + GCE + "\" instead");
|
||||
if (DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.exists(settings) == false) {
|
||||
return Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), GCE).build();
|
||||
}
|
||||
}
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
IOUtils.close(gceInstancesService.get());
|
||||
|
Loading…
x
Reference in New Issue
Block a user