From 33341600037bc72464ef56d6be5650ca99ac2069 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Fri, 16 Oct 2015 12:21:46 +0200 Subject: [PATCH] improved building of disco nodes * improved retry policy of ec2 client * cache results for 10s --- docs/plugins/discovery-ec2.asciidoc | 6 ++- .../cloud/aws/AwsEc2Service.java | 1 + .../cloud/aws/AwsEc2ServiceImpl.java | 22 ++++++++++ .../ec2/AwsEc2UnicastHostsProvider.java | 33 ++++++++++++++ .../discovery/ec2/Ec2DiscoveryTests.java | 44 +++++++++++++++++++ 5 files changed, 105 insertions(+), 1 deletion(-) diff --git a/docs/plugins/discovery-ec2.asciidoc b/docs/plugins/discovery-ec2.asciidoc index 5ac208576df..a2b80495003 100644 --- a/docs/plugins/discovery-ec2.asciidoc +++ b/docs/plugins/discovery-ec2.asciidoc @@ -165,6 +165,11 @@ The following are a list of settings (prefixed with `discovery.ec2`) that can fu Defaults to `3s`. If no unit like `ms`, `s` or `m` is specified, milliseconds are used. +`node_cache_time`:: + + How long the list of hosts is cached to prevent further requests to the AWS API. + Defaults to `10s`. + [IMPORTANT] .Binding the network host @@ -195,7 +200,6 @@ as valid network host settings: |`_ec2_` |equivalent to _ec2:privateIpv4_. |================================================================== - [[discovery-ec2-permissions]] ===== Recommended EC2 Permissions diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java index ab2b54633f4..a427b4af4ab 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2Service.java @@ -49,6 +49,7 @@ public interface AwsEc2Service extends LifecycleComponent { public static final String GROUPS = "discovery.ec2.groups"; public static final String TAG_PREFIX = "discovery.ec2.tag."; public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones"; + public static final String NODE_CACHE_TIME = "discovery.ec2.node_cache_time"; } AmazonEC2 client(); diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2ServiceImpl.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2ServiceImpl.java index 26e001c2666..39ece106df8 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2ServiceImpl.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/cloud/aws/AwsEc2ServiceImpl.java @@ -19,10 +19,13 @@ package org.elasticsearch.cloud.aws; +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.ClientConfiguration; import com.amazonaws.Protocol; import com.amazonaws.auth.*; import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.retry.RetryPolicy; import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.AmazonEC2Client; import org.elasticsearch.ElasticsearchException; @@ -36,6 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import java.util.Locale; +import java.util.Random; /** * @@ -103,6 +107,24 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent } } + // Increase the number of retries in case of 5xx API responses + final Random rand = new Random(); + RetryPolicy retryPolicy = new RetryPolicy( + RetryPolicy.RetryCondition.NO_RETRY_CONDITION, + new RetryPolicy.BackoffStrategy() { + @Override + public long delayBeforeNextRetry(AmazonWebServiceRequest originalRequest, + AmazonClientException exception, + int retriesAttempted) { + // with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000) + logger.warn("EC2 API request failed, retry again. Reason was:", exception); + return 1000L * (long) (10d * Math.pow(2, ((double) retriesAttempted) / 2.0d) * (1.0d + rand.nextDouble())); + } + }, + 10, + false); + clientConfiguration.setRetryPolicy(retryPolicy); + AWSCredentialsProvider credentials; if (account == null && key == null) { diff --git a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java index 94c65047847..f7e70281a3d 100644 --- a/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java +++ b/plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java @@ -31,6 +31,8 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.SingleObjectCache; import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; import org.elasticsearch.transport.TransportService; @@ -64,6 +66,8 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni private final HostType hostType; + private final DiscoNodesCache discoNodes; + @Inject public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service, Version version) { super(settings); @@ -74,6 +78,9 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip") .toUpperCase(Locale.ROOT)); + this.discoNodes = new DiscoNodesCache(this.settings.getAsTime(DISCOVERY_EC2.NODE_CACHE_TIME, + TimeValue.timeValueMillis(10_000L))); + this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true); this.groups = new HashSet<>(); groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS))); @@ -94,6 +101,11 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni @Override public List buildDynamicNodes() { + return discoNodes.getOrRefresh(); + } + + protected List fetchDynamicNodes() { + List discoNodes = new ArrayList<>(); DescribeInstancesResult descInstances; @@ -199,4 +211,25 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni return describeInstancesRequest; } + + private final class DiscoNodesCache extends SingleObjectCache> { + + private boolean empty = true; + + protected DiscoNodesCache(TimeValue refreshInterval) { + super(refreshInterval, new ArrayList<>()); + } + + @Override + protected boolean needsRefresh() { + return (empty || super.needsRefresh()); + } + + @Override + protected List refresh() { + List nodes = fetchDynamicNodes(); + empty = nodes.isEmpty(); + return nodes; + } + } } diff --git a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java index ca9493bc4c9..4fc3faef316 100644 --- a/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java +++ b/plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; import org.junit.AfterClass; import org.junit.Before; @@ -231,4 +232,47 @@ public class Ec2DiscoveryTests extends ESTestCase { assertThat(discoveryNodes, hasSize(prodInstances)); } + abstract class DummyEc2HostProvider extends AwsEc2UnicastHostsProvider { + public int fetchCount = 0; + public DummyEc2HostProvider(Settings settings, TransportService transportService, AwsEc2Service service, Version version) { + super(settings, transportService, service, version); + } + } + + public void testGetNodeListEmptyCache() throws Exception { + AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null); + DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service, Version.CURRENT) { + @Override + protected List fetchDynamicNodes() { + fetchCount++; + return new ArrayList<>(); + } + }; + for (int i=0; i<3; i++) { + provider.buildDynamicNodes(); + } + assertEquals(provider.fetchCount, is(3)); + } + + public void testGetNodeListCached() throws Exception { + Settings.Builder builder = Settings.settingsBuilder() + .put(DISCOVERY_EC2.NODE_CACHE_TIME, "500ms"); + AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null); + DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, awsEc2Service, Version.CURRENT) { + @Override + protected List fetchDynamicNodes() { + fetchCount++; + return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1); + } + }; + for (int i=0; i<3; i++) { + provider.buildDynamicNodes(); + } + assertEquals(provider.fetchCount, is(1)); + Thread.sleep(1_000L); // wait for cache to expire + for (int i=0; i<3; i++) { + provider.buildDynamicNodes(); + } + assertEquals(provider.fetchCount, is(2)); + } }