Use Azure Java Management SDK 0.7.0

This first version adds `azure-management` 0.7.0 instead of using our own XML implementation.
We can now have more control and give more options to the users.

We now support different keystore types using `cloud.azure.management.keystore.type`:

* `pkcs12`
* `jceks`
* `jks`

Closes #38

(cherry picked from commit 72c77d3)
(cherry picked from commit d2541ab)
This commit is contained in:
David Pilato 2015-02-04 17:11:56 +01:00
parent 42509d0da8
commit 85ac406f63
16 changed files with 406 additions and 556 deletions

View File

@ -323,12 +323,15 @@ And add the following lines:
# If you don't remember your account id, you may get it with `azure account list`
cloud:
azure:
keystore: /home/elasticsearch/azurekeystore.pkcs12
password: your_password_for_keystore
subscription_id: your_azure_subscription_id
service_name: your_azure_cloud_service_name
management:
subscription.id: your_azure_subscription_id
cloud.service.name: your_azure_cloud_service_name
keystore:
path: /home/elasticsearch/azurekeystore.pkcs12
password: your_password_for_keystore
discovery:
type: azure
type: azure
# Recommended (warning: non durable disk)
# path.data: /mnt/resource/elasticsearch/data

11
pom.xml
View File

@ -73,6 +73,17 @@ governing permissions and limitations under the License. -->
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-management-compute</artifactId>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-management</artifactId>
<version>0.7.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>

View File

@ -31,6 +31,8 @@ governing permissions and limitations under the License. -->
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>com.microsoft.azure:azure-storage</include>
<include>com.microsoft.azure:azure-management</include>
<include>com.microsoft.azure:azure-management-compute</include>
</includes>
</dependencySet>
</dependencySets>

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure;
import org.elasticsearch.ElasticsearchIllegalStateException;
public class AzureServiceDisableException extends ElasticsearchIllegalStateException {
public AzureServiceDisableException() {
super(null);
}
public AzureServiceDisableException(String msg) {
super(msg);
}
public AzureServiceDisableException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure;
import org.elasticsearch.ElasticsearchIllegalStateException;
public class AzureServiceRemoteException extends ElasticsearchIllegalStateException {
public AzureServiceRemoteException() {
super(null);
}
public AzureServiceRemoteException(String msg) {
super(msg);
}
public AzureServiceRemoteException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -31,9 +31,13 @@ public class AzureSettingsFilter implements SettingsFilter.Filter {
@Override
public void filter(ImmutableSettings.Builder settings) {
// Cloud global settings
settings.remove("cloud.azure." + AzureComputeService.Fields.REFRESH);
// Cloud management API settings
settings.remove("cloud.azure.management." + AzureComputeService.Fields.KEYSTORE_PATH);
settings.remove("cloud.azure.management." + AzureComputeService.Fields.KEYSTORE_PASSWORD);
settings.remove("cloud.azure.management." + AzureComputeService.Fields.KEYSTORE_TYPE);
settings.remove("cloud.azure.management." + AzureComputeService.Fields.SUBSCRIPTION_ID);
settings.remove("cloud.azure.management." + AzureComputeService.Fields.SERVICE_NAME);

View File

@ -1,113 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure;
/**
* Define an Azure Instance
*/
public class Instance {
public static enum Status {
STARTED
}
private String privateIp;
private String publicIp;
private String publicPort;
private Status status;
private String name;
public String getPrivateIp() {
return privateIp;
}
public void setPrivateIp(String privateIp) {
this.privateIp = privateIp;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPublicIp() {
return publicIp;
}
public void setPublicIp(String publicIp) {
this.publicIp = publicIp;
}
public String getPublicPort() {
return publicPort;
}
public void setPublicPort(String publicPort) {
this.publicPort = publicPort;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Instance instance = (Instance) o;
if (name != null ? !name.equals(instance.name) : instance.name != null) return false;
if (privateIp != null ? !privateIp.equals(instance.privateIp) : instance.privateIp != null) return false;
if (publicIp != null ? !publicIp.equals(instance.publicIp) : instance.publicIp != null) return false;
if (publicPort != null ? !publicPort.equals(instance.publicPort) : instance.publicPort != null) return false;
if (status != instance.status) return false;
return true;
}
@Override
public int hashCode() {
int result = privateIp != null ? privateIp.hashCode() : 0;
result = 31 * result + (publicIp != null ? publicIp.hashCode() : 0);
result = 31 * result + (publicPort != null ? publicPort.hashCode() : 0);
result = 31 * result + (status != null ? status.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
@Override
public String toString() {
final StringBuffer sb = new StringBuffer("Instance{");
sb.append("privateIp='").append(privateIp).append('\'');
sb.append(", publicIp='").append(publicIp).append('\'');
sb.append(", publicPort='").append(publicPort).append('\'');
sb.append(", status=").append(status);
sb.append(", name='").append(name).append('\'');
sb.append('}');
return sb.toString();
}
}

View File

@ -19,9 +19,7 @@
package org.elasticsearch.cloud.azure.management;
import org.elasticsearch.cloud.azure.Instance;
import java.util.Set;
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
/**
*
@ -49,6 +47,7 @@ public interface AzureComputeService {
// Keystore settings
public static final String KEYSTORE_PATH = "keystore.path";
public static final String KEYSTORE_PASSWORD = "keystore.password";
public static final String KEYSTORE_TYPE = "keystore.type";
public static final String REFRESH = "refresh_interval";
@ -56,5 +55,5 @@ public interface AzureComputeService {
public static final String ENDPOINT_NAME = "endpoint.name";
}
public Set<Instance> instances();
public HostedServiceGetDetailedResponse getServiceDetails();
}

View File

@ -19,38 +19,24 @@
package org.elasticsearch.cloud.azure.management;
import com.microsoft.windowsazure.Configuration;
import com.microsoft.windowsazure.core.utils.KeyStoreType;
import com.microsoft.windowsazure.management.compute.ComputeManagementClient;
import com.microsoft.windowsazure.management.compute.ComputeManagementService;
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
import com.microsoft.windowsazure.management.configuration.ManagementConfiguration;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.azure.AzureServiceDisableException;
import org.elasticsearch.cloud.azure.AzureServiceRemoteException;
import org.elasticsearch.cloud.azure.AzureSettingsFilter;
import org.elasticsearch.cloud.azure.Instance;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.security.*;
import java.security.cert.CertificateException;
import java.util.HashSet;
import java.util.Set;
import java.net.URI;
import java.net.URISyntaxException;
/**
*
@ -60,140 +46,57 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent<AzureCom
static final class Azure {
private static final String ENDPOINT = "https://management.core.windows.net/";
private static final String VERSION = "2013-03-01";
}
private SSLSocketFactory socketFactory;
private final String keystorePath;
private final String keystorePassword;
private final String subscriptionId;
private final ComputeManagementClient computeManagementClient;
private final String serviceName;
private final String publicEndpointName;
@Inject
public AzureComputeServiceImpl(Settings settings, SettingsFilter settingsFilter) {
super(settings);
settingsFilter.addFilter(new AzureSettingsFilter());
// Creating socketFactory
subscriptionId = componentSettings.get(Fields.SUBSCRIPTION_ID, settings.get("cloud.azure." + Fields.SUBSCRIPTION_ID_DEPRECATED));
String subscriptionId = componentSettings.get(Fields.SUBSCRIPTION_ID, settings.get("cloud.azure." + Fields.SUBSCRIPTION_ID_DEPRECATED));
serviceName = componentSettings.get(Fields.SERVICE_NAME, settings.get("cloud.azure." + Fields.SERVICE_NAME_DEPRECATED));
keystorePath = componentSettings.get(Fields.KEYSTORE_PATH, settings.get("cloud.azure." + Fields.KEYSTORE_DEPRECATED));
keystorePassword = componentSettings.get(Fields.KEYSTORE_PASSWORD, settings.get("cloud.azure." + Fields.PASSWORD_DEPRECATED));
// TODO Remove in 3.0.0
String portName = settings.get("cloud.azure." + Fields.PORT_NAME_DEPRECATED);
if (portName != null) {
logger.warn("setting [cloud.azure.{}] has been deprecated. please replace with [discovery.azure.{}].",
Fields.PORT_NAME_DEPRECATED, Fields.ENDPOINT_NAME);
this.publicEndpointName = portName;
} else {
this.publicEndpointName = componentSettings.get(Fields.ENDPOINT_NAME, "elasticsearch");
String keystorePath = componentSettings.get(Fields.KEYSTORE_PATH, settings.get("cloud.azure." + Fields.KEYSTORE_DEPRECATED));
String keystorePassword = componentSettings.get(Fields.KEYSTORE_PASSWORD, settings.get("cloud.azure." + Fields.PASSWORD_DEPRECATED));
String strKeyStoreType = componentSettings.get(Fields.KEYSTORE_TYPE, KeyStoreType.pkcs12.name());
KeyStoreType tmpKeyStoreType = KeyStoreType.pkcs12;
try {
tmpKeyStoreType = KeyStoreType.fromString(strKeyStoreType);
} catch (Exception e) {
logger.warn("wrong value for [{}]: [{}]. falling back to [{}]...", Fields.KEYSTORE_TYPE,
strKeyStoreType, KeyStoreType.pkcs12.name());
}
KeyStoreType keystoreType = tmpKeyStoreType;
// Check that we have all needed properties
Configuration configuration;
try {
socketFactory = getSocketFactory(keystorePath, keystorePassword);
logger.trace("creating new Azure client for [{}], [{}], [{}]", subscriptionId, serviceName, portName);
} catch (Exception e) {
// Can not start Azure Client
configuration = ManagementConfiguration.configure(new URI(Azure.ENDPOINT),
subscriptionId, keystorePath, keystorePassword, keystoreType);
} catch (IOException|URISyntaxException e) {
logger.error("can not start azure client: {}", e.getMessage());
socketFactory = null;
computeManagementClient = null;
return;
}
}
private InputStream getXML(String api) throws UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, KeyManagementException {
String https_url = Azure.ENDPOINT + subscriptionId + api;
URL url = new URL( https_url );
HttpsURLConnection con = (HttpsURLConnection) url.openConnection();
con.setSSLSocketFactory( socketFactory );
con.setRequestProperty("x-ms-version", Azure.VERSION);
logger.debug("calling azure REST API: {}", api);
logger.trace("get {} from azure", https_url);
return con.getInputStream();
logger.trace("creating new Azure client for [{}], [{}]", subscriptionId, serviceName);
computeManagementClient = ComputeManagementService.create(configuration);
}
@Override
public Set<Instance> instances() {
if (socketFactory == null) {
public HostedServiceGetDetailedResponse getServiceDetails() {
if (computeManagementClient == null) {
// Azure plugin is disabled
logger.trace("azure plugin is disabled. Returning an empty list of nodes.");
return new HashSet<>();
} else {
try {
InputStream stream = getXML("/services/hostedservices/" + serviceName + "?embed-detail=true");
Set<Instance> instances = buildInstancesFromXml(stream, publicEndpointName);
logger.trace("get instances from azure: {}", instances);
return instances;
} catch (ParserConfigurationException | XPathExpressionException | SAXException e) {
logger.warn("can not parse XML response: {}", e.getMessage());
} catch (Exception e) {
logger.warn("can not get list of azure nodes: {}", e.getMessage());
}
}
return new HashSet<>();
}
private static String extractValueFromPath(Node node, String path) throws XPathExpressionException {
XPath xPath = XPathFactory.newInstance().newXPath();
Node subnode = (Node) xPath.compile(path).evaluate(node, XPathConstants.NODE);
return subnode.getFirstChild().getNodeValue();
}
public static Set<Instance> buildInstancesFromXml(InputStream inputStream, String portName) throws ParserConfigurationException, IOException, SAXException, XPathExpressionException {
Set<Instance> instances = new HashSet<>();
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(inputStream);
doc.getDocumentElement().normalize();
XPath xPath = XPathFactory.newInstance().newXPath();
// We only fetch Started nodes (TODO: should we start with all nodes whatever the status is?)
String expression = "/HostedService/Deployments/Deployment/RoleInstanceList/RoleInstance[PowerState='Started']";
NodeList nodeList = (NodeList) xPath.compile(expression).evaluate(doc, XPathConstants.NODESET);
for (int i = 0; i < nodeList.getLength(); i++) {
Instance instance = new Instance();
Node node = nodeList.item(i);
instance.setPrivateIp(extractValueFromPath(node, "IpAddress"));
instance.setName(extractValueFromPath(node, "InstanceName"));
instance.setStatus(Instance.Status.STARTED);
// Let's digg into <InstanceEndpoints>
expression = "InstanceEndpoints/InstanceEndpoint[Name='"+ portName +"']";
NodeList endpoints = (NodeList) xPath.compile(expression).evaluate(node, XPathConstants.NODESET);
for (int j = 0; j < endpoints.getLength(); j++) {
Node endpoint = endpoints.item(j);
instance.setPublicIp(extractValueFromPath(endpoint, "Vip"));
instance.setPublicPort(extractValueFromPath(endpoint, "PublicPort"));
}
instances.add(instance);
throw new AzureServiceDisableException("azure plugin is disabled.");
}
return instances;
}
private SSLSocketFactory getSocketFactory(String keystore, String password) throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, UnrecoverableKeyException, KeyManagementException {
File pKeyFile = new File(keystore);
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
KeyStore keyStore = KeyStore.getInstance("PKCS12");
InputStream keyInput = new FileInputStream(pKeyFile);
keyStore.load(keyInput, password.toCharArray());
keyInput.close();
keyManagerFactory.init(keyStore, password.toCharArray());
SSLContext context = SSLContext.getInstance("TLS");
context.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());
return context.getSocketFactory();
try {
return computeManagementClient.getHostedServicesOperations().getDetailed(serviceName);
} catch (Exception e) {
throw new AzureServiceRemoteException("can not get list of azure nodes", e);
}
}
@Override
@ -206,5 +109,12 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent<AzureCom
@Override
protected void doClose() throws ElasticsearchException {
if (computeManagementClient != null) {
try {
computeManagementClient.close();
} catch (IOException e) {
logger.error("error while closing Azure client", e);
}
}
}
}

View File

@ -19,11 +19,12 @@
package org.elasticsearch.discovery.azure;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import com.microsoft.windowsazure.management.compute.models.*;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.azure.AzureServiceDisableException;
import org.elasticsearch.cloud.azure.AzureServiceRemoteException;
import org.elasticsearch.cloud.azure.management.AzureComputeService;
import org.elasticsearch.cloud.azure.management.AzureComputeService.Fields;
import org.elasticsearch.cloud.azure.Instance;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.AbstractComponent;
@ -38,7 +39,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import java.util.Set;
/**
*
@ -59,7 +59,10 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
private final TimeValue refreshInterval;
private long lastRefresh;
private List<DiscoveryNode> cachedDiscoNodes;
private final HostType host_type;
private final HostType hostType;
private final String publicEndpointName;
private final String deploymentName;
private final DeploymentSlot deploymentSlot;
@Inject
public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureComputeService,
@ -74,8 +77,30 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
this.refreshInterval = componentSettings.getAsTime(Fields.REFRESH,
settings.getAsTime("cloud.azure." + Fields.REFRESH, TimeValue.timeValueSeconds(0)));
this.host_type = HostType.valueOf(componentSettings.get(Fields.HOST_TYPE,
settings.get("cloud.azure." + Fields.HOST_TYPE_DEPRECATED, HostType.PRIVATE_IP.name())).toUpperCase());
HostType tmpHostType;
String strHostType = componentSettings.get(Fields.HOST_TYPE,
settings.get("cloud.azure." + Fields.HOST_TYPE_DEPRECATED, HostType.PRIVATE_IP.name())).toUpperCase();
try {
tmpHostType = HostType.valueOf(strHostType);
} catch (IllegalArgumentException e) {
logger.warn("wrong value for [{}]: [{}]. falling back to [{}]...", Fields.HOST_TYPE,
strHostType, HostType.PRIVATE_IP.name().toLowerCase());
tmpHostType = HostType.PRIVATE_IP;
}
this.hostType = tmpHostType;
// TODO Remove in 3.0.0
String portName = settings.get("cloud.azure." + Fields.PORT_NAME_DEPRECATED);
if (portName != null) {
logger.warn("setting [{}] has been deprecated. please replace with [{}].", Fields.PORT_NAME_DEPRECATED, Fields.ENDPOINT_NAME);
this.publicEndpointName = portName;
} else {
this.publicEndpointName = componentSettings.get(Fields.ENDPOINT_NAME, "elasticsearch");
}
this.deploymentName = componentSettings.get(Fields.SERVICE_NAME, settings.get("cloud.azure." + Fields.SERVICE_NAME_DEPRECATED));
this.deploymentSlot = DeploymentSlot.Production;
}
/**
@ -98,60 +123,110 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
cachedDiscoNodes = Lists.newArrayList();
Set<Instance> response = azureComputeService.instances();
String ipAddress = null;
HostedServiceGetDetailedResponse detailed;
try {
InetAddress inetAddress = networkService.resolvePublishHostAddress(null);
if (inetAddress != null) {
ipAddress = inetAddress.getHostAddress();
}
logger.trace("ipAddress found: [{}]", ipAddress);
} catch (IOException e) {
// We can't find the publish host address... Hmmm. Too bad :-(
logger.trace("exception while finding ipAddress", e);
detailed = azureComputeService.getServiceDetails();
} catch (AzureServiceDisableException e) {
logger.debug("Azure discovery service has been disabled. Returning empty list of nodes.");
return cachedDiscoNodes;
} catch (AzureServiceRemoteException e) {
// We got a remote exception
logger.warn("can not get list of azure nodes: [{}]. Returning empty list of nodes.", e.getMessage());
logger.trace("AzureServiceRemoteException caught", e);
return cachedDiscoNodes;
}
InetAddress ipAddress = null;
try {
for (Instance instance : response) {
ipAddress = networkService.resolvePublishHostAddress(null);
logger.trace("ip of current node: [{}]", ipAddress);
} catch (IOException e) {
// We can't find the publish host address... Hmmm. Too bad :-(
logger.trace("exception while finding ip", e);
}
for (HostedServiceGetDetailedResponse.Deployment deployment : detailed.getDeployments()) {
// We check the deployment slot
if (deployment.getDeploymentSlot() != deploymentSlot) {
logger.debug("current deployment slot [{}] for [{}] is different from [{}]. skipping...",
deployment.getDeploymentSlot(), deployment.getName(), deploymentSlot);
continue;
}
// If provided, we check the deployment name
if (deploymentName != null && !deploymentName.equals(deployment.getName())) {
logger.debug("current deployment name [{}] different from [{}]. skipping...",
deployment.getName(), deploymentName);
continue;
}
// We check current deployment status
if (deployment.getStatus() != DeploymentStatus.Starting &&
deployment.getStatus() != DeploymentStatus.Deploying &&
deployment.getStatus() != DeploymentStatus.Running) {
logger.debug("[{}] status is [{}]. skipping...",
deployment.getName(), deployment.getStatus());
continue;
}
// In other case, it should be the right deployment so we can add it to the list of instances
for (RoleInstance instance : deployment.getRoleInstances()) {
String networkAddress = null;
// Let's detect if we want to use public or private IP
if (host_type == HostType.PRIVATE_IP) {
if (instance.getPrivateIp() != null) {
if (instance.getPrivateIp().equals(ipAddress)) {
logger.trace("adding ourselves {}", ipAddress);
switch (hostType) {
case PRIVATE_IP:
InetAddress privateIp = instance.getIPAddress();
if (privateIp != null) {
if (privateIp.equals(ipAddress)) {
logger.trace("adding ourselves {}", ipAddress);
}
networkAddress = privateIp.getHostAddress();
} else {
logger.trace("no private ip provided. ignoring [{}]...", instance.getInstanceName());
}
networkAddress = instance.getPrivateIp();
} else {
logger.trace("no private ip provided ignoring {}", instance.getName());
}
}
if (host_type == HostType.PUBLIC_IP) {
if (instance.getPublicIp() != null && instance.getPublicPort() != null) {
networkAddress = instance.getPublicIp() + ":" +instance.getPublicPort();
} else {
logger.trace("no public ip provided ignoring {}", instance.getName());
}
break;
case PUBLIC_IP:
for (InstanceEndpoint endpoint : instance.getInstanceEndpoints()) {
if (!publicEndpointName.equals(endpoint.getName())) {
logger.trace("ignoring endpoint [{}] as different than [{}]",
endpoint.getName(), publicEndpointName);
continue;
}
networkAddress = endpoint.getVirtualIPAddress().getHostAddress() + ":" + endpoint.getPort();
}
if (networkAddress == null) {
logger.trace("no public ip provided. ignoring [{}]...", instance.getInstanceName());
}
break;
default:
// This could never happen!
logger.warn("undefined host_type [{}]. Please check your settings.", hostType);
return cachedDiscoNodes;
}
if (networkAddress == null) {
// We have a bad parameter here or not enough information from azure
throw new ElasticsearchIllegalArgumentException("can't find any " + host_type.name() + " address");
} else {
logger.warn("no network address found. ignoring [{}]...", instance.getInstanceName());
continue;
}
try {
TransportAddress[] addresses = transportService.addressesFromString(networkAddress);
// we only limit to 1 addresses, makes no sense to ping 100 ports
logger.trace("adding {}, transport_address {}", networkAddress, addresses[0]);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getName(), addresses[0], version.minimumCompatibilityVersion()));
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), addresses[0],
version.minimumCompatibilityVersion()));
} catch (Exception e) {
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
}
}
} catch (Throwable e) {
logger.warn("Exception caught during discovery {} : {}", e.getClass().getName(), e.getMessage());
logger.trace("Exception caught during discovery", e);
}
logger.debug("{} node(s) added", cachedDiscoNodes.size());
logger.debug("using dynamic discovery nodes {}", cachedDiscoNodes);
return cachedDiscoNodes;
}

View File

@ -19,12 +19,12 @@
package org.elasticsearch.cloud.azure.management;
import org.elasticsearch.cloud.azure.Instance;
import com.microsoft.windowsazure.management.compute.models.*;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.HashSet;
import java.util.Set;
import java.net.InetAddress;
/**
* Mock Azure API with a single started node
@ -37,12 +37,32 @@ public class AzureComputeServiceSimpleMock extends AzureComputeServiceAbstractMo
}
@Override
public Set<Instance> instances() {
Set<Instance> instances = new HashSet<>();
Instance azureHost = new Instance();
azureHost.setPrivateIp("127.0.0.1");
instances.add(azureHost);
public HostedServiceGetDetailedResponse getServiceDetails() {
HostedServiceGetDetailedResponse response = new HostedServiceGetDetailedResponse();
HostedServiceGetDetailedResponse.Deployment deployment = new HostedServiceGetDetailedResponse.Deployment();
return instances;
// Fake the deployment
deployment.setName("dummy");
deployment.setDeploymentSlot(DeploymentSlot.Production);
deployment.setStatus(DeploymentStatus.Running);
// Fake an instance
RoleInstance instance = new RoleInstance();
instance.setInstanceName("dummy1");
// Fake the private IP
instance.setIPAddress(InetAddress.getLoopbackAddress());
// Fake the public IP
InstanceEndpoint endpoint = new InstanceEndpoint();
endpoint.setName("elasticsearch");
endpoint.setVirtualIPAddress(InetAddress.getLoopbackAddress());
endpoint.setPort(9400);
instance.setInstanceEndpoints(Lists.newArrayList(endpoint));
deployment.setRoleInstances(Lists.newArrayList(instance));
response.setDeployments(Lists.newArrayList(deployment));
return response;
}
}

View File

@ -19,32 +19,69 @@
package org.elasticsearch.cloud.azure.management;
import org.elasticsearch.cloud.azure.Instance;
import com.microsoft.windowsazure.management.compute.models.*;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import java.util.HashSet;
import java.util.Set;
import java.net.InetAddress;
/**
* Mock Azure API with two started nodes
*/
public class AzureComputeServiceTwoNodesMock extends AzureComputeServiceAbstractMock {
NetworkService networkService;
@Inject
protected AzureComputeServiceTwoNodesMock(Settings settings) {
protected AzureComputeServiceTwoNodesMock(Settings settings, NetworkService networkService) {
super(settings);
this.networkService = networkService;
}
@Override
public Set<Instance> instances() {
Set<Instance> instances = new HashSet<>();
Instance azureHost = new Instance();
azureHost.setPrivateIp("127.0.0.1");
instances.add(azureHost);
azureHost = new Instance();
azureHost.setPrivateIp("127.0.0.1");
instances.add(azureHost);
return instances;
public HostedServiceGetDetailedResponse getServiceDetails() {
HostedServiceGetDetailedResponse response = new HostedServiceGetDetailedResponse();
HostedServiceGetDetailedResponse.Deployment deployment = new HostedServiceGetDetailedResponse.Deployment();
// Fake the deployment
deployment.setName("dummy");
deployment.setDeploymentSlot(DeploymentSlot.Production);
deployment.setStatus(DeploymentStatus.Running);
// Fake a first instance
RoleInstance instance1 = new RoleInstance();
instance1.setInstanceName("dummy1");
// Fake the private IP
instance1.setIPAddress(InetAddress.getLoopbackAddress());
// Fake the public IP
InstanceEndpoint endpoint1 = new InstanceEndpoint();
endpoint1.setName("elasticsearch");
endpoint1.setVirtualIPAddress(InetAddress.getLoopbackAddress());
endpoint1.setPort(9400);
instance1.setInstanceEndpoints(Lists.newArrayList(endpoint1));
// Fake a first instance
RoleInstance instance2 = new RoleInstance();
instance2.setInstanceName("dummy1");
// Fake the private IP
instance2.setIPAddress(InetAddress.getLoopbackAddress());
// Fake the public IP
InstanceEndpoint endpoint2 = new InstanceEndpoint();
endpoint2.setName("elasticsearch");
endpoint2.setVirtualIPAddress(InetAddress.getLoopbackAddress());
endpoint2.setPort(9401);
instance2.setInstanceEndpoints(Lists.newArrayList(endpoint2));
deployment.setRoleInstances(Lists.newArrayList(instance1, instance2));
response.setDeployments(Lists.newArrayList(deployment));
return response;
}
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.azure;
import org.elasticsearch.cloud.azure.Instance;
import org.elasticsearch.cloud.azure.management.AzureComputeServiceImpl;
import org.junit.Assert;
import org.junit.Test;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.xpath.XPathExpressionException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
public class AzureInstanceXmlParserTest {
private Instance build(String name, String privateIpAddress,
String publicIpAddress,
String publicPort,
Instance.Status status) {
Instance instance = new Instance();
instance.setName(name);
instance.setPrivateIp(privateIpAddress);
instance.setPublicIp(publicIpAddress);
instance.setPublicPort(publicPort);
instance.setStatus(status);
return instance;
}
@Test
public void testReadXml() throws ParserConfigurationException, SAXException, XPathExpressionException, IOException {
InputStream inputStream = AzureInstanceXmlParserTest.class.getResourceAsStream("/org/elasticsearch/azure/test/services.xml");
Set<Instance> instances = AzureComputeServiceImpl.buildInstancesFromXml(inputStream, "elasticsearch");
Set<Instance> expected = new HashSet<>();
expected.add(build("es-windows2008", "10.53.250.55", null, null, Instance.Status.STARTED));
expected.add(build("myesnode1", "10.53.218.75", "137.116.213.150", "9300", Instance.Status.STARTED));
Assert.assertArrayEquals(expected.toArray(), instances.toArray());
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.azure;
import org.elasticsearch.cloud.azure.management.AzureComputeServiceSimpleMock;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -36,9 +37,44 @@ public class AzureSimpleTest extends AbstractAzureComputeServiceTest {
}
@Test
public void one_node_should_run() {
public void one_node_should_run_using_private_ip() {
ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder()
.put("cloud.azure.service_name", "dummy")
.put("cloud.azure.host_type", "private_ip")
.put(super.settingsBuilder());
logger.info("--> start one node");
internalCluster().startNode(settingsBuilder());
internalCluster().startNode(settings);
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("1s").execute().actionGet().getState().nodes().masterNodeId(), notNullValue());
// We expect having 1 node as part of the cluster, let's test that
checkNumberOfNodes(1);
}
@Test
public void one_node_should_run_using_public_ip() {
ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder()
.put("cloud.azure.service_name", "dummy")
.put("cloud.azure.host_type", "public_ip")
.put(super.settingsBuilder());
logger.info("--> start one node");
internalCluster().startNode(settings);
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("1s").execute().actionGet().getState().nodes().masterNodeId(), notNullValue());
// We expect having 1 node as part of the cluster, let's test that
checkNumberOfNodes(1);
}
@Test
public void one_node_should_run_using_wrong_settings() {
ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder()
.put("cloud.azure.service_name", "dummy")
.put("cloud.azure.host_type", "do_not_exist")
.put(super.settingsBuilder());
logger.info("--> start one node");
internalCluster().startNode(settings);
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("1s").execute().actionGet().getState().nodes().masterNodeId(), notNullValue());
// We expect having 1 node as part of the cluster, let's test that

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.azure;
import org.elasticsearch.cloud.azure.management.AzureComputeServiceTwoNodesMock;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
@ -36,13 +37,37 @@ public class AzureTwoStartedNodesTest extends AbstractAzureComputeServiceTest {
}
@Test
public void two_nodes_should_run() {
public void two_nodes_should_run_using_private_ip() {
ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder()
.put("cloud.azure.service_name", "dummy")
.put("cloud.azure.host_type", "private_ip")
.put(super.settingsBuilder());
logger.info("--> start first node");
internalCluster().startNode(settingsBuilder());
internalCluster().startNode(settings);
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("1s").execute().actionGet().getState().nodes().masterNodeId(), notNullValue());
logger.info("--> start another node");
internalCluster().startNode(settingsBuilder());
internalCluster().startNode(settings);
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("1s").execute().actionGet().getState().nodes().masterNodeId(), notNullValue());
// We expect having 2 nodes as part of the cluster, let's test that
checkNumberOfNodes(2);
}
@Test
public void two_nodes_should_run_using_public_ip() {
ImmutableSettings.Builder settings = ImmutableSettings.settingsBuilder()
.put("cloud.azure.service_name", "dummy")
.put("cloud.azure.host_type", "public_ip")
.put(super.settingsBuilder());
logger.info("--> start first node");
internalCluster().startNode(settings);
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("1s").execute().actionGet().getState().nodes().masterNodeId(), notNullValue());
logger.info("--> start another node");
internalCluster().startNode(settings);
assertThat(client().admin().cluster().prepareState().setMasterNodeTimeout("1s").execute().actionGet().getState().nodes().masterNodeId(), notNullValue());
// We expect having 2 nodes as part of the cluster, let's test that

View File

@ -1,170 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<HostedService xmlns="http://schemas.microsoft.com/windowsazure" xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
<Url>https://management.core.windows.net/a836d210-c681-4916-ae7d-9ca5f1b4906f/services/hostedservices/azure-elasticsearch-cluster</Url>
<ServiceName>azure-elasticsearch-cluster</ServiceName>
<HostedServiceProperties>
<Description>Implicitly created hosted service</Description>
<Location>West Europe</Location>
<Label>YXp1cmUtZWxhc3RpY3NlYXJjaC1jbHVzdGVy</Label>
<Status>Created</Status>
<DateCreated>2013-09-23T12:45:45Z</DateCreated>
<DateLastModified>2013-09-23T13:42:46Z</DateLastModified>
<ExtendedProperties />
</HostedServiceProperties>
<Deployments>
<Deployment>
<Name>azure-elasticsearch-cluster</Name>
<DeploymentSlot>Production</DeploymentSlot>
<PrivateID>c6a690796e6b497a918487546193b94a</PrivateID>
<Status>Running</Status>
<Label>WVhwMWNtVXRaV3hoYzNScFkzTmxZWEpqYUMxamJIVnpkR1Z5</Label>
<Url>http://azure-elasticsearch-cluster.cloudapp.net/</Url>
<Configuration>PFNlcnZpY2VDb25maWd1cmF0aW9uIHhtbG5zOnhzZD0iaHR0cDovL3d3dy53My5vcmcvMjAwMS9YTUxTY2hlbWEiIHhtbG5zOnhzaT0iaHR0cDovL3d3dy53My5vcmcvMjAwMS9YTUxTY2hlbWEtaW5zdGFuY2UiIHhtbG5zPSJodHRwOi8vc2NoZW1hcy5taWNyb3NvZnQuY29tL1NlcnZpY2VIb3N0aW5nLzIwMDgvMTAvU2VydmljZUNvbmZpZ3VyYXRpb24iPg0KICA8Um9sZSBuYW1lPSJlcy13aW5kb3dzMjAwOCI+DQogICAgPEluc3RhbmNlcyBjb3VudD0iMSIgLz4NCiAgPC9Sb2xlPg0KICA8Um9sZSBuYW1lPSJteWVzbm9kZTEiPg0KICAgIDxJbnN0YW5jZXMgY291bnQ9IjEiIC8+DQogIDwvUm9sZT4NCjwvU2VydmljZUNvbmZpZ3VyYXRpb24+</Configuration>
<RoleInstanceList>
<RoleInstance>
<RoleName>es-windows2008</RoleName>
<InstanceName>es-windows2008</InstanceName>
<InstanceStatus>ReadyRole</InstanceStatus>
<InstanceUpgradeDomain>0</InstanceUpgradeDomain>
<InstanceFaultDomain>0</InstanceFaultDomain>
<InstanceSize>ExtraSmall</InstanceSize>
<InstanceStateDetails />
<IpAddress>10.53.250.55</IpAddress>
<InstanceEndpoints>
<InstanceEndpoint>
<Name>PowerShell</Name>
<Vip>137.116.213.150</Vip>
<PublicPort>5986</PublicPort>
<LocalPort>5986</LocalPort>
<Protocol>tcp</Protocol>
</InstanceEndpoint>
<InstanceEndpoint>
<Name>Remote Desktop</Name>
<Vip>137.116.213.150</Vip>
<PublicPort>53867</PublicPort>
<LocalPort>3389</LocalPort>
<Protocol>tcp</Protocol>
</InstanceEndpoint>
</InstanceEndpoints>
<PowerState>Started</PowerState>
<HostName>es-windows2008</HostName>
<RemoteAccessCertificateThumbprint>E6798DEACEAD4752825A2ABFC0A9A367389ED802</RemoteAccessCertificateThumbprint>
</RoleInstance>
<RoleInstance>
<RoleName>myesnode1</RoleName>
<InstanceName>myesnode1</InstanceName>
<InstanceStatus>ReadyRole</InstanceStatus>
<InstanceUpgradeDomain>0</InstanceUpgradeDomain>
<InstanceFaultDomain>0</InstanceFaultDomain>
<InstanceSize>ExtraSmall</InstanceSize>
<InstanceStateDetails />
<IpAddress>10.53.218.75</IpAddress>
<InstanceEndpoints>
<InstanceEndpoint>
<Name>ssh</Name>
<Vip>137.116.213.150</Vip>
<PublicPort>22</PublicPort>
<LocalPort>22</LocalPort>
<Protocol>tcp</Protocol>
</InstanceEndpoint>
<InstanceEndpoint>
<Name>elasticsearch</Name>
<Vip>137.116.213.150</Vip>
<PublicPort>9300</PublicPort>
<LocalPort>9300</LocalPort>
<Protocol>tcp</Protocol>
</InstanceEndpoint>
</InstanceEndpoints>
<PowerState>Started</PowerState>
<HostName>myesnode1</HostName>
</RoleInstance>
</RoleInstanceList>
<UpgradeDomainCount>1</UpgradeDomainCount>
<RoleList>
<Role i:type="PersistentVMRole">
<RoleName>es-windows2008</RoleName>
<OsVersion />
<RoleType>PersistentVMRole</RoleType>
<ConfigurationSets>
<ConfigurationSet i:type="NetworkConfigurationSet">
<ConfigurationSetType>NetworkConfiguration</ConfigurationSetType>
<InputEndpoints>
<InputEndpoint>
<LocalPort>5986</LocalPort>
<Name>PowerShell</Name>
<Port>5986</Port>
<Protocol>tcp</Protocol>
<Vip>137.116.213.150</Vip>
</InputEndpoint>
<InputEndpoint>
<LocalPort>3389</LocalPort>
<Name>Remote Desktop</Name>
<Port>53867</Port>
<Protocol>tcp</Protocol>
<Vip>137.116.213.150</Vip>
</InputEndpoint>
</InputEndpoints>
<SubnetNames />
</ConfigurationSet>
</ConfigurationSets>
<DataVirtualHardDisks />
<OSVirtualHardDisk>
<HostCaching>ReadWrite</HostCaching>
<DiskName>azure-elasticsearch-cluster-es-windows2008-0-201309231342520150</DiskName>
<MediaLink>http://portalvhdsv5skghwlmf765.blob.core.windows.net/vhds/azure-elasticsearch-cluster-es-windows2008-2013-09-23.vhd</MediaLink>
<SourceImageName>a699494373c04fc0bc8f2bb1389d6106__Win2K8R2SP1-Datacenter-201308.02-en.us-127GB.vhd</SourceImageName>
<OS>Windows</OS>
</OSVirtualHardDisk>
<RoleSize>ExtraSmall</RoleSize>
</Role>
<Role i:type="PersistentVMRole">
<RoleName>myesnode1</RoleName>
<OsVersion />
<RoleType>PersistentVMRole</RoleType>
<ConfigurationSets>
<ConfigurationSet i:type="NetworkConfigurationSet">
<ConfigurationSetType>NetworkConfiguration</ConfigurationSetType>
<InputEndpoints>
<InputEndpoint>
<LocalPort>22</LocalPort>
<Name>ssh</Name>
<Port>22</Port>
<Protocol>tcp</Protocol>
<Vip>137.116.213.150</Vip>
</InputEndpoint>
</InputEndpoints>
<SubnetNames />
</ConfigurationSet>
</ConfigurationSets>
<DataVirtualHardDisks />
<OSVirtualHardDisk>
<HostCaching>ReadWrite</HostCaching>
<DiskName>azure-elasticsearch-cluster-myesnode1-0-201309231246380270</DiskName>
<MediaLink>http://azureelasticsearchcluste.blob.core.windows.net/vhd-store/myesnode1-0cc86cc84c0ed2e0.vhd</MediaLink>
<SourceImageName>b39f27a8b8c64d52b05eac6a62ebad85__Ubuntu-13_10-amd64-server-20130808-alpha3-en-us-30GB</SourceImageName>
<OS>Linux</OS>
</OSVirtualHardDisk>
<RoleSize>ExtraSmall</RoleSize>
</Role>
</RoleList>
<SdkVersion />
<Locked>false</Locked>
<RollbackAllowed>false</RollbackAllowed>
<CreatedTime>2013-09-23T12:46:27Z</CreatedTime>
<LastModifiedTime>2013-09-23T20:38:58Z</LastModifiedTime>
<ExtendedProperties />
<PersistentVMDowntime>
<StartTime>2013-08-27T08:00:00Z</StartTime>
<EndTime>2013-09-27T20:00:00Z</EndTime>
<Status>PersistentVMUpdateScheduled</Status>
</PersistentVMDowntime>
<VirtualIPs>
<VirtualIP>
<Address>137.116.213.150</Address>
<IsDnsProgrammed>true</IsDnsProgrammed>
<Name>azure-elasticsearch-clusterContractContract</Name>
</VirtualIP>
</VirtualIPs>
</Deployment>
</Deployments>
</HostedService>