Move EC2 Discovery Tests to Mock Rest API (#50605) (#52270)

Move EC2 discovery tests to using the mock REST API introduced in
https://github.com/elastic/elasticsearch/pull/50550 instead of mocking
the AWS SDK classes manually.
Move the trivial remaining AWS SDK mocks to the single test suit that
was using them.
This commit is contained in:
Armin Braun 2020-02-12 18:35:50 +01:00 committed by GitHub
parent 26900bfb05
commit 6ea3f5ada1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 360 additions and 496 deletions

View File

@ -55,8 +55,7 @@ class AwsEc2ServiceImpl implements AwsEc2Service {
// proxy for testing
AmazonEC2 buildClient(AWSCredentialsProvider credentials, ClientConfiguration configuration) {
final AmazonEC2 client = new AmazonEC2Client(credentials, configuration);
return client;
return new AmazonEC2Client(credentials, configuration);
}
// pkg private for tests

View File

@ -0,0 +1,214 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.Tag;
import com.sun.net.httpserver.HttpServer;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import javax.xml.XMLConstants;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import static java.nio.charset.StandardCharsets.UTF_8;
@SuppressForbidden(reason = "use a http server")
public abstract class AbstractEC2MockAPITestCase extends ESTestCase {
protected HttpServer httpServer;
protected ThreadPool threadPool;
protected MockTransportService transportService;
protected NetworkService networkService = new NetworkService(Collections.emptyList());
@Before
public void setUp() throws Exception {
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
threadPool = new TestThreadPool(EC2RetriesTests.class.getName());
transportService = createTransportService();
super.setUp();
}
protected abstract MockTransportService createTransportService();
protected Settings buildSettings(String accessKey) {
final InetSocketAddress address = httpServer.getAddress();
final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
final MockSecureSettings mockSecure = new MockSecureSettings();
mockSecure.setString(Ec2ClientSettings.ACCESS_KEY_SETTING.getKey(), accessKey);
mockSecure.setString(Ec2ClientSettings.SECRET_KEY_SETTING.getKey(), "ec2_secret");
return Settings.builder().put(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), endpoint).setSecureSettings(mockSecure).build();
}
@After
public void tearDown() throws Exception {
try {
IOUtils.close(transportService, () -> terminate(threadPool), () -> httpServer.stop(0));
} finally {
super.tearDown();
}
}
/**
* Generates a XML response that describe the EC2 instances
* TODO: org.elasticsearch.discovery.ec2.AmazonEC2Fixture uses pretty much the same code. We should dry up that test fixture.
*/
static byte[] generateDescribeInstancesResponse(List<Instance> instances) {
final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
xmlOutputFactory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
final StringWriter out = new StringWriter();
XMLStreamWriter sw;
try {
sw = xmlOutputFactory.createXMLStreamWriter(out);
sw.writeStartDocument();
String namespace = "http://ec2.amazonaws.com/doc/2013-02-01/";
sw.setDefaultNamespace(namespace);
sw.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, "DescribeInstancesResponse", namespace);
{
sw.writeStartElement("requestId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("reservationSet");
{
for (Instance instance : instances) {
sw.writeStartElement("item");
{
sw.writeStartElement("reservationId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("instancesSet");
{
sw.writeStartElement("item");
{
sw.writeStartElement("instanceId");
sw.writeCharacters(instance.getInstanceId());
sw.writeEndElement();
sw.writeStartElement("imageId");
sw.writeCharacters(instance.getImageId());
sw.writeEndElement();
sw.writeStartElement("instanceState");
{
sw.writeStartElement("code");
sw.writeCharacters("16");
sw.writeEndElement();
sw.writeStartElement("name");
sw.writeCharacters("running");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateDnsName");
sw.writeCharacters(instance.getPrivateDnsName());
sw.writeEndElement();
sw.writeStartElement("dnsName");
sw.writeCharacters(instance.getPublicDnsName());
sw.writeEndElement();
sw.writeStartElement("instanceType");
sw.writeCharacters("m1.medium");
sw.writeEndElement();
sw.writeStartElement("placement");
{
sw.writeStartElement("availabilityZone");
sw.writeCharacters("use-east-1e");
sw.writeEndElement();
sw.writeEmptyElement("groupName");
sw.writeStartElement("tenancy");
sw.writeCharacters("default");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateIpAddress");
sw.writeCharacters(instance.getPrivateIpAddress());
sw.writeEndElement();
sw.writeStartElement("ipAddress");
sw.writeCharacters(instance.getPublicIpAddress());
sw.writeEndElement();
sw.writeStartElement("tagSet");
for (Tag tag : instance.getTags()) {
sw.writeStartElement("item");
{
sw.writeStartElement("key");
sw.writeCharacters(tag.getKey());
sw.writeEndElement();
sw.writeStartElement("value");
sw.writeCharacters(tag.getValue());
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeEndDocument();
sw.flush();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return out.toString().getBytes(UTF_8);
}
}

View File

@ -1,179 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.ec2.AbstractAmazonEC2;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Filter;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.InstanceState;
import com.amazonaws.services.ec2.model.InstanceStateName;
import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.Tag;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class AmazonEC2Mock extends AbstractAmazonEC2 {
private static final Logger logger = LogManager.getLogger(AmazonEC2Mock.class);
public static final String PREFIX_PRIVATE_IP = "10.0.0.";
public static final String PREFIX_PUBLIC_IP = "8.8.8.";
public static final String PREFIX_PUBLIC_DNS = "mock-ec2-";
public static final String SUFFIX_PUBLIC_DNS = ".amazon.com";
public static final String PREFIX_PRIVATE_DNS = "mock-ip-";
public static final String SUFFIX_PRIVATE_DNS = ".ec2.internal";
final List<Instance> instances = new ArrayList<>();
String endpoint;
final AWSCredentialsProvider credentials;
final ClientConfiguration configuration;
public AmazonEC2Mock(int nodes, List<List<Tag>> tagsList, AWSCredentialsProvider credentials, ClientConfiguration configuration) {
if (tagsList != null) {
assert tagsList.size() == nodes;
}
for (int node = 1; node < nodes + 1; node++) {
String instanceId = "node" + node;
Instance instance = new Instance()
.withInstanceId(instanceId)
.withState(new InstanceState().withName(InstanceStateName.Running))
.withPrivateDnsName(PREFIX_PRIVATE_DNS + instanceId + SUFFIX_PRIVATE_DNS)
.withPublicDnsName(PREFIX_PUBLIC_DNS + instanceId + SUFFIX_PUBLIC_DNS)
.withPrivateIpAddress(PREFIX_PRIVATE_IP + node)
.withPublicIpAddress(PREFIX_PUBLIC_IP + node);
if (tagsList != null) {
instance.setTags(tagsList.get(node-1));
}
instances.add(instance);
}
this.credentials = credentials;
this.configuration = configuration;
}
@Override
public DescribeInstancesResult describeInstances(DescribeInstancesRequest describeInstancesRequest)
throws AmazonClientException {
Collection<Instance> filteredInstances = new ArrayList<>();
logger.debug("--> mocking describeInstances");
for (Instance instance : instances) {
boolean tagFiltered = false;
boolean instanceFound = false;
Map<String, List<String>> expectedTags = new HashMap<>();
Map<String, List<String>> instanceTags = new HashMap<>();
for (Tag tag : instance.getTags()) {
List<String> tags = instanceTags.get(tag.getKey());
if (tags == null) {
tags = new ArrayList<>();
instanceTags.put(tag.getKey(), tags);
}
tags.add(tag.getValue());
}
for (Filter filter : describeInstancesRequest.getFilters()) {
// If we have the same tag name and one of the values, we add the instance
if (filter.getName().startsWith("tag:")) {
tagFiltered = true;
String tagName = filter.getName().substring(4);
// if we have more than one value for the same key, then the key is appended with .x
Pattern p = Pattern.compile("\\.\\d+", Pattern.DOTALL);
Matcher m = p.matcher(tagName);
if (m.find()) {
int i = tagName.lastIndexOf(".");
tagName = tagName.substring(0, i);
}
List<String> tags = expectedTags.get(tagName);
if (tags == null) {
tags = new ArrayList<>();
expectedTags.put(tagName, tags);
}
tags.addAll(filter.getValues());
}
}
if (tagFiltered) {
logger.debug("--> expected tags: [{}]", expectedTags);
logger.debug("--> instance tags: [{}]", instanceTags);
instanceFound = true;
for (Map.Entry<String, List<String>> expectedTagsEntry : expectedTags.entrySet()) {
List<String> instanceTagValues = instanceTags.get(expectedTagsEntry.getKey());
if (instanceTagValues == null) {
instanceFound = false;
break;
}
for (String expectedValue : expectedTagsEntry.getValue()) {
boolean valueFound = false;
for (String instanceTagValue : instanceTagValues) {
if (instanceTagValue.equals(expectedValue)) {
valueFound = true;
}
}
if (valueFound == false) {
instanceFound = false;
}
}
}
}
if (tagFiltered == false || instanceFound) {
logger.debug("--> instance added");
filteredInstances.add(instance);
} else {
logger.debug("--> instance filtered");
}
}
return new DescribeInstancesResult().withReservations(
new Reservation().withInstances(filteredInstances)
);
}
@Override
public void setEndpoint(String endpoint) throws IllegalArgumentException {
this.endpoint = endpoint;
}
@Override
public void shutdown() {
}
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.Tag;
import java.util.List;
public class AwsEc2ServiceMock extends AwsEc2ServiceImpl {
private final int nodes;
private final List<List<Tag>> tagsList;
public AwsEc2ServiceMock(int nodes, List<List<Tag>> tagsList) {
this.nodes = nodes;
this.tagsList = tagsList;
}
@Override
AmazonEC2 buildClient(AWSCredentialsProvider credentials, ClientConfiguration configuration) {
return new AmazonEC2Mock(nodes, tagsList, credentials, configuration);
}
}

View File

@ -20,7 +20,7 @@
package org.elasticsearch.discovery.ec2;
import com.amazonaws.http.HttpMethodName;
import com.sun.net.httpserver.HttpServer;
import com.amazonaws.services.ec2.model.Instance;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
@ -28,82 +28,41 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.SeedHostsProvider;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import javax.xml.XMLConstants;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.is;
@SuppressForbidden(reason = "use a http server")
public class EC2RetriesTests extends ESTestCase {
public class EC2RetriesTests extends AbstractEC2MockAPITestCase {
private HttpServer httpServer;
private ThreadPool threadPool;
private MockTransportService transportService;
private NetworkService networkService = new NetworkService(Collections.emptyList());
@Before
public void setUp() throws Exception {
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
threadPool = new TestThreadPool(EC2RetriesTests.class.getName());
final MockNioTransport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, networkService,
@Override
protected MockTransportService createTransportService() {
return new MockTransportService(Settings.EMPTY, new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService());
transportService =
new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
super.setUp();
}
@After
public void tearDown() throws Exception {
try {
IOUtils.close(transportService, () -> terminate(threadPool), () -> httpServer.stop(0));
} finally {
super.tearDown();
}
new NoneCircuitBreakerService()), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
}
public void testEC2DiscoveryRetriesOnRateLimiting() throws IOException {
assumeFalse("https://github.com/elastic/elasticsearch/issues/51685", inFipsJvm());
final String accessKey = "ec2_access";
final List<String> hosts = Collections.singletonList("127.0.0.1:9000");
final List<String> hosts = Collections.singletonList("127.0.0.1:9300");
final Map<String, Integer> failedRequests = new ConcurrentHashMap<>();
// retry the same request 5 times at most
final int maxRetries = randomIntBetween(1, 5);
@ -125,7 +84,8 @@ public class EC2RetriesTests extends ESTestCase {
byte[] responseBody = null;
for (NameValuePair parse : URLEncodedUtils.parse(request, UTF_8)) {
if ("Action".equals(parse.getName())) {
responseBody = generateDescribeInstancesResponse(hosts);
responseBody = generateDescribeInstancesResponse(hosts.stream().map(
address -> new Instance().withPublicIpAddress(address)).collect(Collectors.toList()));
break;
}
}
@ -138,14 +98,7 @@ public class EC2RetriesTests extends ESTestCase {
}
fail("did not send response");
});
final InetSocketAddress address = httpServer.getAddress();
final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
final MockSecureSettings mockSecure = new MockSecureSettings();
mockSecure.setString(Ec2ClientSettings.ACCESS_KEY_SETTING.getKey(), accessKey);
mockSecure.setString(Ec2ClientSettings.SECRET_KEY_SETTING.getKey(), "ec2_secret");
try (Ec2DiscoveryPlugin plugin = new Ec2DiscoveryPlugin(
Settings.builder().put(Ec2ClientSettings.ENDPOINT_SETTING.getKey(), endpoint).setSecureSettings(mockSecure).build())) {
try (Ec2DiscoveryPlugin plugin = new Ec2DiscoveryPlugin(buildSettings(accessKey))) {
final SeedHostsProvider seedHostsProvider = plugin.getSeedHostProviders(transportService, networkService).get("ec2").get();
final SeedHostsResolver resolver = new SeedHostsResolver("test", Settings.EMPTY, transportService, seedHostsProvider);
resolver.start();
@ -156,112 +109,4 @@ public class EC2RetriesTests extends ESTestCase {
assertThat(failedRequests.values().iterator().next(), is(maxRetries));
}
}
/**
* Generates a XML response that describe the EC2 instances
* TODO: org.elasticsearch.discovery.ec2.AmazonEC2Fixture uses pretty much the same code. We should dry up that test fixture.
*/
private byte[] generateDescribeInstancesResponse(List<String> nodes) {
final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory();
xmlOutputFactory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
final StringWriter out = new StringWriter();
XMLStreamWriter sw;
try {
sw = xmlOutputFactory.createXMLStreamWriter(out);
sw.writeStartDocument();
String namespace = "http://ec2.amazonaws.com/doc/2013-02-01/";
sw.setDefaultNamespace(namespace);
sw.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, "DescribeInstancesResponse", namespace);
{
sw.writeStartElement("requestId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("reservationSet");
{
for (String address : nodes) {
sw.writeStartElement("item");
{
sw.writeStartElement("reservationId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("instancesSet");
{
sw.writeStartElement("item");
{
sw.writeStartElement("instanceId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("imageId");
sw.writeCharacters(UUID.randomUUID().toString());
sw.writeEndElement();
sw.writeStartElement("instanceState");
{
sw.writeStartElement("code");
sw.writeCharacters("16");
sw.writeEndElement();
sw.writeStartElement("name");
sw.writeCharacters("running");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateDnsName");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("dnsName");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("instanceType");
sw.writeCharacters("m1.medium");
sw.writeEndElement();
sw.writeStartElement("placement");
{
sw.writeStartElement("availabilityZone");
sw.writeCharacters("use-east-1e");
sw.writeEndElement();
sw.writeEmptyElement("groupName");
sw.writeStartElement("tenancy");
sw.writeCharacters("default");
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeStartElement("privateIpAddress");
sw.writeCharacters(address);
sw.writeEndElement();
sw.writeStartElement("ipAddress");
sw.writeCharacters(address);
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
}
sw.writeEndElement();
sw.writeEndDocument();
sw.flush();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return out.toString().getBytes(UTF_8);
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.ec2;
import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.common.settings.Settings;
import java.util.List;
public class Ec2DiscoveryPluginMock extends Ec2DiscoveryPlugin {
Ec2DiscoveryPluginMock(Settings settings) {
this(settings, 1, null);
}
public Ec2DiscoveryPluginMock(Settings settings, int nodes, List<List<Tag>> tagsList) {
super(settings, new AwsEc2ServiceMock(nodes, tagsList));
}
}

View File

@ -19,9 +19,13 @@
package org.elasticsearch.discovery.ec2;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.services.ec2.AbstractAmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
@ -189,4 +193,37 @@ public class Ec2DiscoveryPluginTests extends ESTestCase {
}
}
}
private static class Ec2DiscoveryPluginMock extends Ec2DiscoveryPlugin {
Ec2DiscoveryPluginMock(Settings settings) {
super(settings, new AwsEc2ServiceImpl() {
@Override
AmazonEC2 buildClient(AWSCredentialsProvider credentials, ClientConfiguration configuration) {
return new AmazonEC2Mock(credentials, configuration);
}
});
}
}
private static class AmazonEC2Mock extends AbstractAmazonEC2 {
String endpoint;
final AWSCredentialsProvider credentials;
final ClientConfiguration configuration;
AmazonEC2Mock(AWSCredentialsProvider credentials, ClientConfiguration configuration) {
this.credentials = credentials;
this.configuration = configuration;
}
@Override
public void setEndpoint(String endpoint) throws IllegalArgumentException {
this.endpoint = endpoint;
}
@Override
public void shutdown() {
}
}
}

View File

@ -19,71 +19,68 @@
package org.elasticsearch.discovery.ec2;
import com.amazonaws.http.HttpMethodName;
import com.amazonaws.services.ec2.model.Instance;
import com.amazonaws.services.ec2.model.InstanceState;
import com.amazonaws.services.ec2.model.InstanceStateName;
import com.amazonaws.services.ec2.model.Tag;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.nio.MockNioTransport;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
public class Ec2DiscoveryTests extends ESTestCase {
@SuppressForbidden(reason = "use a http server")
public class Ec2DiscoveryTests extends AbstractEC2MockAPITestCase {
private static final String SUFFIX_PRIVATE_DNS = ".ec2.internal";
private static final String PREFIX_PRIVATE_DNS = "mock-ip-";
private static final String SUFFIX_PUBLIC_DNS = ".amazon.com";
private static final String PREFIX_PUBLIC_DNS = "mock-ec2-";
private static final String PREFIX_PUBLIC_IP = "8.8.8.";
private static final String PREFIX_PRIVATE_IP = "10.0.0.";
protected static ThreadPool threadPool;
protected MockTransportService transportService;
private Map<String, TransportAddress> poorMansDNS = new ConcurrentHashMap<>();
@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(Ec2DiscoveryTests.class.getName());
}
@AfterClass
public static void stopThreadPool() throws InterruptedException {
if (threadPool !=null) {
terminate(threadPool);
threadPool = null;
}
}
@Before
public void createTransportService() {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
protected MockTransportService createTransportService() {
final Transport transport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool,
new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, writableRegistry(),
new NoneCircuitBreakerService()) {
@Override
public TransportAddress[] addressesFromString(String address) throws UnknownHostException {
public TransportAddress[] addressesFromString(String address) {
// we just need to ensure we don't resolve DNS here
return new TransportAddress[] {poorMansDNS.getOrDefault(address, buildNewFakeTransportAddress())};
}
};
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
return new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
}
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
@ -91,8 +88,65 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
final String accessKey = "ec2_key";
try (Ec2DiscoveryPlugin plugin = new Ec2DiscoveryPlugin(buildSettings(accessKey))) {
AwsEc2SeedHostsProvider provider = new AwsEc2SeedHostsProvider(nodeSettings, transportService, plugin.ec2Service);
httpServer.createContext("/", exchange -> {
if (exchange.getRequestMethod().equals(HttpMethodName.POST.name())) {
final String request = Streams.readFully(exchange.getRequestBody()).toBytesRef().utf8ToString();
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
if (userAgent != null && userAgent.startsWith("aws-sdk-java")) {
final String auth = exchange.getRequestHeaders().getFirst("Authorization");
if (auth == null || auth.contains(accessKey) == false) {
throw new IllegalArgumentException("wrong access key: " + auth);
}
// Simulate an EC2 DescribeInstancesResponse
final Map<String, List<String>> tagsIncluded = new HashMap<>();
final String[] params = request.split("&");
Arrays.stream(params).filter(entry -> entry.startsWith("Filter.") && entry.contains("=tag%3A"))
.forEach(entry -> {
final int startIndex = "Filter.".length();
final int filterId = Integer.parseInt(entry.substring(startIndex, entry.indexOf(".", startIndex)));
tagsIncluded.put(entry.substring(entry.indexOf("=tag%3A") + "=tag%3A".length()),
Arrays.stream(params)
.filter(param -> param.startsWith("Filter." + filterId + ".Value."))
.map(param -> param.substring(param.indexOf("=") + 1))
.collect(Collectors.toList()));
}
);
final List<Instance> instances = IntStream.range(1, nodes + 1).mapToObj(node -> {
final String instanceId = "node" + node;
final Instance instance = new Instance()
.withInstanceId(instanceId)
.withState(new InstanceState().withName(InstanceStateName.Running))
.withPrivateDnsName(PREFIX_PRIVATE_DNS + instanceId + SUFFIX_PRIVATE_DNS)
.withPublicDnsName(PREFIX_PUBLIC_DNS + instanceId + SUFFIX_PUBLIC_DNS)
.withPrivateIpAddress(PREFIX_PRIVATE_IP + node)
.withPublicIpAddress(PREFIX_PUBLIC_IP + node);
if (tagsList != null) {
instance.setTags(tagsList.get(node - 1));
}
return instance;
}).filter(instance ->
tagsIncluded.entrySet().stream().allMatch(entry -> instance.getTags().stream()
.filter(t -> t.getKey().equals(entry.getKey()))
.map(Tag::getValue)
.collect(Collectors.toList())
.containsAll(entry.getValue())))
.collect(Collectors.toList());
for (NameValuePair parse : URLEncodedUtils.parse(request, UTF_8)) {
if ("Action".equals(parse.getName())) {
final byte[] responseBody = generateDescribeInstancesResponse(instances);
exchange.getResponseHeaders().set("Content-Type", "text/xml; charset=UTF-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, responseBody.length);
exchange.getResponseBody().write(responseBody);
return;
}
}
}
}
fail("did not send response");
});
List<TransportAddress> dynamicHosts = provider.getSeedAddresses(null);
logger.debug("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
@ -113,7 +167,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPrivateIp() throws InterruptedException {
int nodes = randomInt(10);
for (int i = 0; i < nodes; i++) {
poorMansDNS.put(AmazonEC2Mock.PREFIX_PRIVATE_IP + (i+1), buildNewFakeTransportAddress());
poorMansDNS.put(PREFIX_PRIVATE_IP + (i+1), buildNewFakeTransportAddress());
}
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_ip")
@ -123,7 +177,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
// We check that we are using here expected address
int node = 1;
for (TransportAddress address : transportAddresses) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++);
TransportAddress expected = poorMansDNS.get(PREFIX_PRIVATE_IP + node++);
assertEquals(address, expected);
}
}
@ -131,7 +185,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
public void testPublicIp() throws InterruptedException {
int nodes = randomInt(10);
for (int i = 0; i < nodes; i++) {
poorMansDNS.put(AmazonEC2Mock.PREFIX_PUBLIC_IP + (i+1), buildNewFakeTransportAddress());
poorMansDNS.put(PREFIX_PUBLIC_IP + (i+1), buildNewFakeTransportAddress());
}
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_ip")
@ -141,7 +195,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
// We check that we are using here expected address
int node = 1;
for (TransportAddress address : dynamicHosts) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++);
TransportAddress expected = poorMansDNS.get(PREFIX_PUBLIC_IP + node++);
assertEquals(address, expected);
}
}
@ -150,8 +204,8 @@ public class Ec2DiscoveryTests extends ESTestCase {
int nodes = randomInt(10);
for (int i = 0; i < nodes; i++) {
String instanceId = "node" + (i+1);
poorMansDNS.put(AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId +
AmazonEC2Mock.SUFFIX_PRIVATE_DNS, buildNewFakeTransportAddress());
poorMansDNS.put(PREFIX_PRIVATE_DNS + instanceId +
SUFFIX_PRIVATE_DNS, buildNewFakeTransportAddress());
}
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_dns")
@ -163,7 +217,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS);
PREFIX_PRIVATE_DNS + instanceId + SUFFIX_PRIVATE_DNS);
assertEquals(address, expected);
}
}
@ -172,8 +226,8 @@ public class Ec2DiscoveryTests extends ESTestCase {
int nodes = randomInt(10);
for (int i = 0; i < nodes; i++) {
String instanceId = "node" + (i+1);
poorMansDNS.put(AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId
+ AmazonEC2Mock.SUFFIX_PUBLIC_DNS, buildNewFakeTransportAddress());
poorMansDNS.put(PREFIX_PUBLIC_DNS + instanceId
+ SUFFIX_PUBLIC_DNS, buildNewFakeTransportAddress());
}
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_dns")
@ -185,7 +239,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS);
PREFIX_PUBLIC_DNS + instanceId + SUFFIX_PUBLIC_DNS);
assertEquals(address, expected);
}
}
@ -289,8 +343,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
}
abstract class DummyEc2SeedHostsProvider extends AwsEc2SeedHostsProvider {
abstract static class DummyEc2SeedHostsProvider extends AwsEc2SeedHostsProvider {
public int fetchCount = 0;
DummyEc2SeedHostsProvider(Settings settings, TransportService transportService, AwsEc2Service service) {
super(settings, transportService, service);
@ -298,7 +351,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
public void testGetNodeListEmptyCache() {
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(1, null);
AwsEc2Service awsEc2Service = new AwsEc2ServiceImpl();
DummyEc2SeedHostsProvider provider = new DummyEc2SeedHostsProvider(Settings.EMPTY, transportService, awsEc2Service) {
@Override
protected List<TransportAddress> fetchDynamicNodes() {
@ -311,27 +364,4 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
assertThat(provider.fetchCount, is(1));
}
public void testGetNodeListCached() throws Exception {
Settings.Builder builder = Settings.builder()
.put(AwsEc2Service.NODE_CACHE_TIME_SETTING.getKey(), "500ms");
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) {
DummyEc2SeedHostsProvider provider = new DummyEc2SeedHostsProvider(builder.build(), transportService, plugin.ec2Service) {
@Override
protected List<TransportAddress> fetchDynamicNodes() {
fetchCount++;
return Ec2DiscoveryTests.this.buildDynamicHosts(Settings.EMPTY, 1);
}
};
for (int i=0; i<3; i++) {
provider.getSeedAddresses(null);
}
assertThat(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.getSeedAddresses(null);
}
assertThat(provider.fetchCount, is(2));
}
}
}