Plugins: Make UnicastHostsProvider extension pull based (#21036)

This change moves providing UnicastHostsProvider for zen discovery to be
pull based, adding a getter in DiscoveryPlugin. A new setting is added,
discovery.zen.hosts_provider, to separate the discovery type from the
hosts provider for zen when it is selected. Unfortunately existing
plugins added ZenDiscovery with their own name in order to just provide
a hosts provider, so there are already many users setting the hosts
provider through discovery.type. This change also includes backcompat,
falling back to discovery.type when discovery.zen.hosts_provider is not
set.
This commit is contained in:
Ryan Ernst 2016-10-20 09:13:59 -07:00 committed by GitHub
parent efffb946e2
commit 60353a245a
27 changed files with 394 additions and 378 deletions

View File

@ -328,6 +328,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
NodeEnvironment.NODE_ID_SEED_SETTING,
DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING,
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING,
FaultDetection.PING_RETRIES_SETTING,
FaultDetection.PING_TIMEOUT_SETTING,
FaultDetection.REGISTER_CONNECTION_LISTENER_SETTING,

View File

@ -20,23 +20,26 @@
package org.elasticsearch.discovery;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.discovery.zen.ZenPingService;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastZenPing;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A module for loading classes for node discovery.
@ -44,33 +47,31 @@ import java.util.function.Function;
public class DiscoveryModule extends AbstractModule {
public static final Setting<String> DISCOVERY_TYPE_SETTING =
new Setting<>("discovery.type", "zen", Function.identity(),
Property.NodeScope);
new Setting<>("discovery.type", "zen", Function.identity(), Property.NodeScope);
public static final Setting<String> DISCOVERY_HOSTS_PROVIDER_SETTING =
new Setting<>("discovery.zen.hosts_provider", DISCOVERY_TYPE_SETTING, Function.identity(), Property.NodeScope);
private final Settings settings;
private final Map<String, List<Class<? extends UnicastHostsProvider>>> unicastHostProviders = new HashMap<>();
private final Map<String, Supplier<UnicastHostsProvider>> unicastHostProviders;
private final ExtensionPoint.ClassSet<ZenPing> zenPings = new ExtensionPoint.ClassSet<>("zen_ping", ZenPing.class);
private final Map<String, Class<? extends Discovery>> discoveryTypes = new HashMap<>();
public DiscoveryModule(Settings settings) {
public DiscoveryModule(Settings settings, TransportService transportService, NetworkService networkService,
List<DiscoveryPlugin> plugins) {
this.settings = settings;
addDiscoveryType("none", NoneDiscovery.class);
addDiscoveryType("zen", ZenDiscovery.class);
}
/**
* Adds a custom unicast hosts provider to build a dynamic list of unicast hosts list when doing unicast discovery.
*
* @param type discovery for which this provider is relevant
* @param unicastHostProvider the host provider
*/
public void addUnicastHostProvider(String type, Class<? extends UnicastHostsProvider> unicastHostProvider) {
List<Class<? extends UnicastHostsProvider>> providerList = unicastHostProviders.get(type);
if (providerList == null) {
providerList = new ArrayList<>();
unicastHostProviders.put(type, providerList);
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("zen", () -> Collections::emptyList);
for (DiscoveryPlugin plugin : plugins) {
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot specify zen hosts provider [" + entry.getKey() + "] twice");
}
});
}
providerList.add(unicastHostProvider);
unicastHostProviders = Collections.unmodifiableMap(hostProviders);
}
/**
@ -97,11 +98,13 @@ public class DiscoveryModule extends AbstractModule {
if (discoveryType.equals("none") == false) {
bind(ZenPingService.class).asEagerSingleton();
Multibinder<UnicastHostsProvider> unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class);
for (Class<? extends UnicastHostsProvider> unicastHostProvider :
unicastHostProviders.getOrDefault(discoveryType, Collections.emptyList())) {
unicastHostsProviderMultibinder.addBinding().to(unicastHostProvider);
String hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
Supplier<UnicastHostsProvider> hostsProviderSupplier = unicastHostProviders.get(hostsProviderName);
if (hostsProviderSupplier == null) {
throw new IllegalArgumentException("Unknown zen hosts provider [" + hostsProviderName + "]");
}
UnicastHostsProvider hostsProvider = Objects.requireNonNull(hostsProviderSupplier.get());
bind(UnicastHostsProvider.class).toInstance(hostsProvider);
if (zenPings.isEmpty()) {
zenPings.registerExtension(UnicastZenPing.class);
}

View File

@ -96,7 +96,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
public static final int LIMIT_FOREIGN_PORTS_COUNT = 1;
public static final int LIMIT_LOCAL_PORTS_COUNT = 5;
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
@ -120,7 +119,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
// a list of temporal responses a node will return for a request (holds requests from other configuredTargetNodes)
private final Queue<PingResponse> temporalResponses = ConcurrentCollections.newQueue();
private final CopyOnWriteArrayList<UnicastHostsProvider> hostsProviders = new CopyOnWriteArrayList<>();
private final UnicastHostsProvider hostsProvider;
private final ExecutorService unicastConnectExecutor;
@ -128,17 +127,13 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
@Inject
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
@Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
UnicastHostsProvider unicastHostsProviders) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
if (unicastHostsProviders != null) {
for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) {
addHostsProvider(unicastHostsProvider);
}
}
this.hostsProvider = unicastHostsProviders;
this.concurrentConnects = DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING.get(settings);
List<String> hosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
@ -213,10 +208,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
closed = true;
}
public void addHostsProvider(UnicastHostsProvider provider) {
hostsProviders.add(provider);
}
@Override
public void setPingContextProvider(PingContextProvider contextProvider) {
this.contextProvider = contextProvider;
@ -345,10 +336,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
nodesToPingSet.add(temporalResponse.node());
}
}
for (UnicastHostsProvider provider : hostsProviders) {
nodesToPingSet.addAll(provider.buildDynamicNodes());
}
nodesToPingSet.addAll(hostsProvider.buildDynamicNodes());
// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : discoNodes.getMasterNodes().values()) {

View File

@ -331,7 +331,6 @@ public class Node implements Closeable {
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
modules.add(new NodeModule(this, monitorService));
modules.add(new DiscoveryModule(this.settings));
ClusterModule clusterModule = new ClusterModule(settings, clusterService,
pluginsService.filterPlugins(ClusterPlugin.class));
modules.add(clusterModule);
@ -344,7 +343,6 @@ public class Node implements Closeable {
modules.add(actionModule);
modules.add(new GatewayModule());
modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class)));
pluginsService.processModules(modules);
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
@ -395,6 +393,8 @@ public class Node implements Closeable {
b.bind(HttpServer.class).toProvider(Providers.of(null));
};
}
modules.add(new DiscoveryModule(this.settings, transportService, networkService, pluginsService.filterPlugins(DiscoveryPlugin.class)));
pluginsService.processModules(modules);
modules.add(b -> {
b.bind(IndicesQueriesRegistry.class).toInstance(searchModule.getQueryParserRegistry());
b.bind(SearchRequestParsers.class).toInstance(searchModule.getSearchRequestParsers());

View File

@ -19,8 +19,14 @@
package org.elasticsearch.plugins;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.transport.TransportService;
/**
* An additional extension point for {@link Plugin}s that extends Elasticsearch's discovery functionality. To add an additional
@ -52,4 +58,20 @@ public interface DiscoveryPlugin {
default NetworkService.CustomNameResolver getCustomNameResolver(Settings settings) {
return null;
}
/**
* Returns providers of unicast host lists for zen discovery.
*
* The key of the returned map is the name of the host provider
* (see {@link org.elasticsearch.discovery.DiscoveryModule#DISCOVERY_HOSTS_PROVIDER_SETTING}), and
* the value is a supplier to construct the host provider when it is selected for use.
*
* @param transportService Use to form the {@link org.elasticsearch.common.transport.TransportAddress} portion
* of a {@link org.elasticsearch.cluster.node.DiscoveryNode}
* @param networkService Use to find the publish host address of the current node
*/
default Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.emptyMap();
}
}

View File

@ -18,31 +18,73 @@
*/
package org.elasticsearch.discovery;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.test.NoopDiscovery;
import org.elasticsearch.transport.TransportService;
public class DiscoveryModuleTests extends ModuleTestCase {
public static class DummyMasterElectionService extends ElectMasterService {
public DummyMasterElectionService(Settings settings) {
super(settings);
public interface DummyDiscoPlugin extends DiscoveryPlugin {
Map<String, Supplier<UnicastHostsProvider>> impl();
@Override
default Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return impl();
}
}
public void testRegisterDefaults() {
Settings settings = Settings.EMPTY;
DiscoveryModule module = new DiscoveryModule(settings);
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.emptyList());
assertBinding(module, Discovery.class, ZenDiscovery.class);
}
public void testRegisterDiscovery() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "custom").build();
DiscoveryModule module = new DiscoveryModule(settings);
DummyDiscoPlugin plugin = () -> Collections.singletonMap("custom", () -> Collections::emptyList);
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.singletonList(plugin));
module.addDiscoveryType("custom", NoopDiscovery.class);
assertBinding(module, Discovery.class, NoopDiscovery.class);
}
public void testHostsProvider() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "custom").build();
final UnicastHostsProvider provider = Collections::emptyList;
DummyDiscoPlugin plugin = () -> Collections.singletonMap("custom", () -> provider);
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.singletonList(plugin));
assertInstanceBinding(module, UnicastHostsProvider.class, instance -> instance == provider);
}
public void testHostsProviderBwc() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "custom").build();
final UnicastHostsProvider provider = Collections::emptyList;
DummyDiscoPlugin plugin = () -> Collections.singletonMap("custom", () -> provider);
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.singletonList(plugin));
module.addDiscoveryType("custom", NoopDiscovery.class);
assertInstanceBinding(module, UnicastHostsProvider.class, instance -> instance == provider);
}
public void testUnknownHostsProvider() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "dne").build();
DiscoveryModule module = new DiscoveryModule(settings, null, null, Collections.emptyList());
assertBindingFailure(module, "Unknown zen hosts provider");
}
public void testDuplicateHostsProvider() {
DummyDiscoPlugin plugin1 = () -> Collections.singletonMap("dup", () -> null);
DummyDiscoPlugin plugin2 = () -> Collections.singletonMap("dup", () -> null);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new DiscoveryModule(Settings.EMPTY, null, null, Arrays.asList(plugin1, plugin2)));
assertEquals("Cannot specify zen hosts provider [dup] twice", e.getMessage());
}
}

View File

@ -60,6 +60,8 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class UnicastZenPingTests extends ESTestCase {
private static final UnicastHostsProvider EMPTY_HOSTS_PROVIDER = Collections::emptyList;
public void testSimplePings() throws InterruptedException {
int startPort = 11000 + randomIntBetween(0, 1000);
int endPort = startPort + 10;
@ -94,7 +96,7 @@ public class UnicastZenPingTests extends ESTestCase {
.build();
Settings hostsSettingsMismatch = Settings.builder().put(hostsSettings).put(settingsMismatch).build();
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, null);
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, EMPTY_HOSTS_PROVIDER);
zenPingA.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
@ -108,7 +110,7 @@ public class UnicastZenPingTests extends ESTestCase {
});
zenPingA.start();
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, null);
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, EMPTY_HOSTS_PROVIDER);
zenPingB.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {
@ -122,7 +124,7 @@ public class UnicastZenPingTests extends ESTestCase {
});
zenPingB.start();
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, null) {
UnicastZenPing zenPingC = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleC.transportService, EMPTY_HOSTS_PROVIDER) {
@Override
protected Version getVersion() {
return versionD;
@ -141,7 +143,7 @@ public class UnicastZenPingTests extends ESTestCase {
});
zenPingC.start();
UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, null);
UnicastZenPing zenPingD = new UnicastZenPing(hostsSettingsMismatch, threadPool, handleD.transportService, EMPTY_HOSTS_PROVIDER);
zenPingD.setPingContextProvider(new PingContextProvider() {
@Override
public DiscoveryNodes nodes() {

View File

@ -1,99 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.classic;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeServiceImpl;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.plugin.discovery.azure.classic.AzureDiscoveryPlugin;
/**
* Azure Module
*
* <ul>
* <li>If needed this module will bind azure discovery service by default
* to AzureComputeServiceImpl.</li>
* </ul>
*
* @see AzureComputeServiceImpl
*/
public class AzureDiscoveryModule extends AbstractModule {
protected final Logger logger;
private Settings settings;
// pkg private so it is settable by tests
Class<? extends AzureComputeService> computeServiceImpl = AzureComputeServiceImpl.class;
@Inject
public AzureDiscoveryModule(Settings settings) {
this.settings = settings;
this.logger = Loggers.getLogger(getClass(), settings);
}
@Override
protected void configure() {
logger.debug("starting azure services");
// If we have set discovery to azure, let's start the azure compute service
if (isDiscoveryReady(settings, logger)) {
logger.debug("starting azure discovery service");
bind(AzureComputeService.class).to(computeServiceImpl).asEagerSingleton();
}
}
/**
* Check if discovery is meant to start
* @return true if we can start discovery features
*/
public static boolean isDiscoveryReady(Settings settings, Logger logger) {
// User set discovery.type: azure
if (!AzureDiscoveryPlugin.AZURE.equalsIgnoreCase(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) {
logger.trace("discovery.type not set to {}", AzureDiscoveryPlugin.AZURE);
return false;
}
if (isDefined(settings, AzureComputeService.Management.SUBSCRIPTION_ID_SETTING) &&
isDefined(settings, AzureComputeService.Management.SERVICE_NAME_SETTING) &&
isDefined(settings, AzureComputeService.Management.KEYSTORE_PATH_SETTING) &&
isDefined(settings, AzureComputeService.Management.KEYSTORE_PASSWORD_SETTING)) {
logger.trace("All required properties for Azure discovery are set!");
return true;
} else {
logger.debug("One or more Azure discovery settings are missing. " +
"Check elasticsearch.yml file. Should have [{}], [{}], [{}] and [{}].",
AzureComputeService.Management.SUBSCRIPTION_ID_SETTING.getKey(),
AzureComputeService.Management.SERVICE_NAME_SETTING.getKey(),
AzureComputeService.Management.KEYSTORE_PATH_SETTING.getKey(),
AzureComputeService.Management.KEYSTORE_PASSWORD_SETTING.getKey());
return false;
}
}
private static boolean isDefined(Settings settings, Setting<String> property) throws ElasticsearchException {
return (property.exists(settings) && Strings.hasText(property.get(settings)));
}
}

View File

@ -19,6 +19,9 @@
package org.elasticsearch.cloud.azure.classic.management;
import java.io.IOException;
import java.util.ServiceLoader;
import com.microsoft.windowsazure.Configuration;
import com.microsoft.windowsazure.core.Builder;
import com.microsoft.windowsazure.core.DefaultBuilder;
@ -30,19 +33,14 @@ import com.microsoft.windowsazure.management.configuration.ManagementConfigurati
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.util.ServiceLoader;
public class AzureComputeServiceImpl extends AbstractLifecycleComponent
implements AzureComputeService {
private final ComputeManagementClient client;
private final String serviceName;
@Inject
public AzureComputeServiceImpl(Settings settings) {
super(settings);
String subscriptionId = Management.SUBSCRIPTION_ID_SETTING.get(settings);

View File

@ -107,10 +107,8 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
private final String deploymentName;
private final DeploymentSlot deploymentSlot;
@Inject
public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureComputeService,
TransportService transportService,
NetworkService networkService) {
TransportService transportService, NetworkService networkService) {
super(settings);
this.azureComputeService = azureComputeService;
this.transportService = transportService;

View File

@ -19,28 +19,34 @@
package org.elasticsearch.plugin.discovery.azure.classic;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cloud.azure.classic.AzureDiscoveryModule;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeServiceImpl;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.azure.classic.AzureUnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.TransportService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
public class AzureDiscoveryPlugin extends Plugin {
public class AzureDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
public static final String AZURE = "azure";
private final Settings settings;
protected final Settings settings;
protected final Logger logger = Loggers.getLogger(AzureDiscoveryPlugin.class);
public AzureDiscoveryPlugin(Settings settings) {
@ -50,15 +56,21 @@ public class AzureDiscoveryPlugin extends Plugin {
logger.trace("starting azure classic discovery plugin...");
}
// overrideable for tests
protected AzureComputeService createComputeService() {
return new AzureComputeServiceImpl(settings);
}
@Override
public Collection<Module> createGuiceModules() {
return Collections.singletonList((Module) new AzureDiscoveryModule(settings));
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap(AZURE,
() -> new AzureUnicastHostsProvider(settings, createComputeService(), transportService, networkService));
}
public void onModule(DiscoveryModule discoveryModule) {
if (AzureDiscoveryModule.isDiscoveryReady(settings, logger)) {
if (isDiscoveryReady(settings, logger)) {
discoveryModule.addDiscoveryType(AZURE, ZenDiscovery.class);
discoveryModule.addUnicastHostProvider(AZURE, AzureUnicastHostsProvider.class);
}
}
@ -76,4 +88,36 @@ public class AzureDiscoveryPlugin extends Plugin {
AzureComputeService.Discovery.ENDPOINT_NAME_SETTING);
}
/**
* Check if discovery is meant to start
* @return true if we can start discovery features
*/
private static boolean isDiscoveryReady(Settings settings, Logger logger) {
// User set discovery.type: azure
if (!AzureDiscoveryPlugin.AZURE.equalsIgnoreCase(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) {
logger.trace("discovery.type not set to {}", AzureDiscoveryPlugin.AZURE);
return false;
}
if (isDefined(settings, AzureComputeService.Management.SUBSCRIPTION_ID_SETTING) &&
isDefined(settings, AzureComputeService.Management.SERVICE_NAME_SETTING) &&
isDefined(settings, AzureComputeService.Management.KEYSTORE_PATH_SETTING) &&
isDefined(settings, AzureComputeService.Management.KEYSTORE_PASSWORD_SETTING)) {
logger.trace("All required properties for Azure discovery are set!");
return true;
} else {
logger.debug("One or more Azure discovery settings are missing. " +
"Check elasticsearch.yml file. Should have [{}], [{}], [{}] and [{}].",
AzureComputeService.Management.SUBSCRIPTION_ID_SETTING.getKey(),
AzureComputeService.Management.SERVICE_NAME_SETTING.getKey(),
AzureComputeService.Management.KEYSTORE_PATH_SETTING.getKey(),
AzureComputeService.Management.KEYSTORE_PASSWORD_SETTING.getKey());
return false;
}
}
private static boolean isDefined(Settings settings, Setting<String> property) throws ElasticsearchException {
return (property.exists(settings) && Strings.hasText(property.get(settings)));
}
}

View File

@ -57,7 +57,7 @@ public abstract class AbstractAzureComputeServiceTestCase extends ESIntegTestCas
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(AzureDiscoveryPlugin.class, mockPlugin);
return Arrays.asList(mockPlugin);
}
protected void checkNumberOfNodes(int expected) {

View File

@ -19,32 +19,35 @@
package org.elasticsearch.cloud.azure.classic;
import java.net.InetAddress;
import com.microsoft.windowsazure.management.compute.models.DeploymentSlot;
import com.microsoft.windowsazure.management.compute.models.DeploymentStatus;
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint;
import com.microsoft.windowsazure.management.compute.models.RoleInstance;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeServiceAbstractMock;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import java.net.InetAddress;
import org.elasticsearch.plugin.discovery.azure.classic.AzureDiscoveryPlugin;
/**
* Mock Azure API with a single started node
*/
public class AzureComputeServiceSimpleMock extends AzureComputeServiceAbstractMock {
public static class TestPlugin extends Plugin {
public void onModule(AzureDiscoveryModule azureDiscoveryModule) {
azureDiscoveryModule.computeServiceImpl = AzureComputeServiceSimpleMock.class;
public static class TestPlugin extends AzureDiscoveryPlugin {
public TestPlugin(Settings settings) {
super(settings);
}
@Override
protected AzureComputeService createComputeService() {
return new AzureComputeServiceSimpleMock(settings);
}
}
@Inject
public AzureComputeServiceSimpleMock(Settings settings) {
private AzureComputeServiceSimpleMock(Settings settings) {
super(settings);
}

View File

@ -19,20 +19,19 @@
package org.elasticsearch.cloud.azure.classic;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import com.microsoft.windowsazure.management.compute.models.DeploymentSlot;
import com.microsoft.windowsazure.management.compute.models.DeploymentStatus;
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint;
import com.microsoft.windowsazure.management.compute.models.RoleInstance;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeServiceAbstractMock;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import org.elasticsearch.plugin.discovery.azure.classic.AzureDiscoveryPlugin;
import static org.elasticsearch.common.util.CollectionUtils.newSingletonArrayList;
@ -41,18 +40,19 @@ import static org.elasticsearch.common.util.CollectionUtils.newSingletonArrayLis
* Mock Azure API with two started nodes
*/
public class AzureComputeServiceTwoNodesMock extends AzureComputeServiceAbstractMock {
public static class TestPlugin extends Plugin {
public void onModule(AzureDiscoveryModule azureDiscoveryModule) {
azureDiscoveryModule.computeServiceImpl = AzureComputeServiceTwoNodesMock.class;
public static class TestPlugin extends AzureDiscoveryPlugin {
public TestPlugin(Settings settings) {
super(settings);
}
@Override
protected AzureComputeService createComputeService() {
return new AzureComputeServiceTwoNodesMock(settings);
}
}
NetworkService networkService;
@Inject
protected AzureComputeServiceTwoNodesMock(Settings settings, NetworkService networkService) {
private AzureComputeServiceTwoNodesMock(Settings settings) {
super(settings);
this.networkService = networkService;
}
@Override

View File

@ -19,6 +19,10 @@
package org.elasticsearch.cloud.aws;
import java.io.Closeable;
import java.io.IOException;
import java.util.Random;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
@ -31,22 +35,17 @@ import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.util.Random;
public class AwsEc2ServiceImpl extends AbstractLifecycleComponent implements AwsEc2Service {
public class AwsEc2ServiceImpl extends AbstractComponent implements AwsEc2Service, Closeable {
public static final String EC2_METADATA_URL = "http://169.254.169.254/latest/meta-data/";
private AmazonEC2Client client;
@Inject
public AwsEc2ServiceImpl(Settings settings) {
super(settings);
}
@ -195,15 +194,7 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent implements Aws
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
public void close() throws IOException {
if (client != null) {
client.shutdown();
}

View File

@ -1,31 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws;
import org.elasticsearch.common.inject.AbstractModule;
public class Ec2Module extends AbstractModule {
@Override
protected void configure() {
bind(AwsEc2Service.class).to(AwsEc2ServiceImpl.class).asEagerSingleton();
}
}

View File

@ -19,6 +19,12 @@
package org.elasticsearch.discovery.ec2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
@ -35,7 +41,6 @@ import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
@ -45,12 +50,6 @@ import org.elasticsearch.transport.TransportService;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.disjoint;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -73,7 +72,6 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
private final DiscoNodesCache discoNodes;
@Inject
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) {
super(settings);
this.transportService = transportService;

View File

@ -19,26 +19,8 @@
package org.elasticsearch.plugin.discovery.ec2;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cloud.aws.Ec2Module;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@ -48,12 +30,33 @@ import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin {
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.TransportService;
public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Closeable {
private static Logger logger = Loggers.getLogger(Ec2DiscoveryPlugin.class);
@ -80,29 +83,15 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin {
}
private Settings settings;
// stashed when created in order to properly close
private final SetOnce<AwsEc2ServiceImpl> ec2Service = new SetOnce<>();
public Ec2DiscoveryPlugin(Settings settings) {
this.settings = settings;
}
@Override
public Collection<Module> createGuiceModules() {
Collection<Module> modules = new ArrayList<>();
modules.add(new Ec2Module());
return modules;
}
@Override
@SuppressWarnings("rawtypes") // Supertype uses rawtype
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
Collection<Class<? extends LifecycleComponent>> services = new ArrayList<>();
services.add(AwsEc2ServiceImpl.class);
return services;
}
public void onModule(DiscoveryModule discoveryModule) {
discoveryModule.addDiscoveryType(EC2, ZenDiscovery.class);
discoveryModule.addUnicastHostProvider(EC2, AwsEc2UnicastHostsProvider.class);
}
@Override
@ -111,6 +100,15 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin {
return new Ec2NameResolver(settings);
}
@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap(EC2, () -> {
ec2Service.set(new AwsEc2ServiceImpl(settings));
return new AwsEc2UnicastHostsProvider(settings, transportService, ec2Service.get());
});
}
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
@ -187,4 +185,9 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin {
return attrs.build();
}
@Override
public void close() throws IOException {
IOUtils.close(ec2Service.get());
}
}

View File

@ -19,13 +19,21 @@
package org.elasticsearch.discovery.file;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.TransportService;
/**
* Plugin for providing file-based unicast hosts discovery. The list of unicast hosts
@ -35,17 +43,33 @@ import org.elasticsearch.plugins.Plugin;
public class FileBasedDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
private static final Logger logger = Loggers.getLogger(FileBasedDiscoveryPlugin.class);
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
private final Settings settings;
public FileBasedDiscoveryPlugin(Settings settings) {
this.settings = settings;
logger.trace("starting file-based discovery plugin...");
}
public void onModule(DiscoveryModule discoveryModule) {
logger.trace("registering file-based unicast hosts provider");
// using zen discovery for the discovery type and we're just adding a unicast host provider for it
discoveryModule.addUnicastHostProvider("zen", FileBasedUnicastHostsProvider.class);
@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap("file", () -> new FileBasedUnicastHostsProvider(settings, transportService));
}
@Override
public Settings additionalSettings() {
// For 5.0, the hosts provider was "zen", but this was before the discovery.zen.hosts_provider
// setting existed. This check looks for the legacy zen, and sets the file hosts provider if not set
String discoveryType = DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings);
// look at hosts provider setting to avoid fallback as default
String hostsProvider = settings.get(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey());
if (hostsProvider == null && discoveryType.equals("zen")) {
deprecationLogger.deprecated("Using " + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() +
" setting to set hosts provider is deprecated. " +
"Set \"" + DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey() + ": file\" instead");
return Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "file").build();
}
return Settings.EMPTY;
}
}

View File

@ -55,7 +55,7 @@ import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveDiscoveryNod
* 67.81.244.11:9305
* 67.81.244.15:9400
*/
public class FileBasedUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
class FileBasedUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";
static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_";
@ -66,8 +66,7 @@ public class FileBasedUnicastHostsProvider extends AbstractComponent implements
private final AtomicLong nodeIdGenerator = new AtomicLong(); // generates unique ids for the node
@Inject
public FileBasedUnicastHostsProvider(Settings settings, TransportService transportService) {
FileBasedUnicastHostsProvider(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
this.unicastHostsFilePath = new Environment(settings).configFile().resolve("discovery-file").resolve(UNICAST_HOSTS_FILE);

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.file;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.ESTestCase;
public class FileBasedDiscoveryPluginTests extends ESTestCase {
public void testHostsProviderBwc() {
FileBasedDiscoveryPlugin plugin = new FileBasedDiscoveryPlugin(Settings.EMPTY);
Settings additionalSettings = plugin.additionalSettings();
assertEquals("file", additionalSettings.get(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()));
}
public void testHostsProviderExplicit() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "foo").build();
FileBasedDiscoveryPlugin plugin = new FileBasedDiscoveryPlugin(settings);
assertEquals(Settings.EMPTY, plugin.additionalSettings());
}
}

View File

@ -25,12 +25,13 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
public interface GceInstancesService extends LifecycleComponent {
public interface GceInstancesService {
/**
* GCE API Version: Elasticsearch/GceCloud/1.0

View File

@ -19,27 +19,7 @@
package org.elasticsearch.cloud.gce;
import com.google.api.client.googleapis.compute.ComputeCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.compute.Compute;
import com.google.api.services.compute.model.Instance;
import com.google.api.services.compute.model.InstanceList;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper;
import java.io.Closeable;
import java.io.IOException;
import java.security.AccessController;
import java.security.GeneralSecurityException;
@ -51,7 +31,26 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Function;
public class GceInstancesServiceImpl extends AbstractLifecycleComponent implements GceInstancesService {
import com.google.api.client.googleapis.compute.ComputeCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.compute.Compute;
import com.google.api.services.compute.model.Instance;
import com.google.api.services.compute.model.InstanceList;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper;
public class GceInstancesServiceImpl extends AbstractComponent implements GceInstancesService, Closeable {
// all settings just used for testing - not registered by default
public static final Setting<Boolean> GCE_VALIDATE_CERTIFICATES =
@ -113,7 +112,6 @@ public class GceInstancesServiceImpl extends AbstractLifecycleComponent implemen
private final boolean validateCerts;
@Inject
public GceInstancesServiceImpl(Settings settings) {
super(settings);
this.project = PROJECT_SETTING.get(settings);
@ -204,22 +202,9 @@ public class GceInstancesServiceImpl extends AbstractLifecycleComponent implemen
}
@Override
protected void doStart() throws ElasticsearchException {
}
@Override
protected void doStop() throws ElasticsearchException {
public void close() throws IOException {
if (gceHttpTransport != null) {
try {
gceHttpTransport.shutdown();
} catch (IOException e) {
logger.warn("unable to shutdown GCE Http Transport", e);
}
gceHttpTransport = null;
gceHttpTransport.shutdown();
}
}
@Override
protected void doClose() throws ElasticsearchException {
}
}

View File

@ -19,19 +19,6 @@
package org.elasticsearch.cloud.gce;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpTransport;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.gce.network.GceNameResolver;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -41,6 +28,16 @@ import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.function.Function;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpTransport;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
public class GceMetadataService extends AbstractLifecycleComponent {
// Forcing Google Token API URL as set in GCE SDK to
@ -53,7 +50,6 @@ public class GceMetadataService extends AbstractLifecycleComponent {
/** Global instance of the HTTP transport. */
private HttpTransport gceHttpTransport;
@Inject
public GceMetadataService(Settings settings) {
super(settings);
}

View File

@ -19,6 +19,13 @@
package org.elasticsearch.discovery.gce;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import com.google.api.services.compute.model.AccessConfig;
import com.google.api.services.compute.model.Instance;
import com.google.api.services.compute.model.NetworkInterface;
@ -29,7 +36,6 @@ import org.elasticsearch.cloud.gce.GceInstancesService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
@ -40,13 +46,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
@ -75,7 +74,6 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
private long lastRefresh;
private List<DiscoveryNode> cachedDiscoNodes;
@Inject
public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstancesService,
TransportService transportService,
NetworkService networkService) {

View File

@ -22,8 +22,11 @@ package org.elasticsearch.plugin.discovery.gce;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.util.ClassInfo;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.gce.GceInstancesService;
import org.elasticsearch.cloud.gce.GceInstancesServiceImpl;
import org.elasticsearch.cloud.gce.GceMetadataService;
import org.elasticsearch.cloud.gce.GceModule;
import org.elasticsearch.cloud.gce.network.GceNameResolver;
@ -35,10 +38,14 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.gce.GceUnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@ -46,12 +53,16 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Closeable {
public static final String GCE = "gce";
private final Settings settings;
protected final Logger logger = Loggers.getLogger(GceDiscoveryPlugin.class);
// stashed when created in order to properly close
private final SetOnce<GceInstancesServiceImpl> gceInstancesService = new SetOnce<>();
static {
/*
@ -80,24 +91,18 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
logger.trace("starting gce discovery plugin...");
}
@Override
public Collection<Module> createGuiceModules() {
return Collections.singletonList(new GceModule(settings));
}
@Override
@SuppressWarnings("rawtypes") // Supertype uses raw type
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
logger.debug("Register gce compute service");
Collection<Class<? extends LifecycleComponent>> services = new ArrayList<>();
services.add(GceModule.getComputeServiceImpl());
return services;
}
public void onModule(DiscoveryModule discoveryModule) {
logger.debug("Register gce discovery type and gce unicast provider");
discoveryModule.addDiscoveryType(GCE, ZenDiscovery.class);
discoveryModule.addUnicastHostProvider(GCE, GceUnicastHostsProvider.class);
}
@Override
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
NetworkService networkService) {
return Collections.singletonMap(GCE, () -> {
gceInstancesService.set(new GceInstancesServiceImpl(settings));
return new GceUnicastHostsProvider(settings, gceInstancesService.get(), transportService, networkService);
});
}
@Override
@ -117,4 +122,9 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
GceInstancesService.RETRY_SETTING,
GceInstancesService.MAX_WAIT_SETTING);
}
@Override
public void close() throws IOException {
IOUtils.close(gceInstancesService.get());
}
}

View File

@ -33,6 +33,7 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@ -98,9 +99,9 @@ public class GceDiscoveryTests extends ESTestCase {
}
@After
public void stopGceComputeService() {
public void stopGceComputeService() throws IOException {
if (mock != null) {
mock.stop();
mock.close();
}
}