Merge branch 'ec2-improve-disco-nodes' of https://github.com/chaudum/elasticsearch into chaudum-ec2-improve-disco-nodes

This commit is contained in:
David Pilato 2015-11-02 14:52:08 +01:00
commit 77521560ed
5 changed files with 105 additions and 1 deletions

View File

@ -165,6 +165,11 @@ The following are a list of settings (prefixed with `discovery.ec2`) that can fu
Defaults to `3s`. If no unit like `ms`, `s` or `m` is specified,
milliseconds are used.
`node_cache_time`::
How long the list of hosts is cached to prevent further requests to the AWS API.
Defaults to `10s`.
[IMPORTANT]
.Binding the network host
@ -195,7 +200,6 @@ as valid network host settings:
|`_ec2_` |equivalent to _ec2:privateIpv4_.
|==================================================================
[[discovery-ec2-permissions]]
===== Recommended EC2 Permissions

View File

@ -49,6 +49,7 @@ public interface AwsEc2Service extends LifecycleComponent<AwsEc2Service> {
public static final String GROUPS = "discovery.ec2.groups";
public static final String TAG_PREFIX = "discovery.ec2.tag.";
public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones";
public static final String NODE_CACHE_TIME = "discovery.ec2.node_cache_time";
}
AmazonEC2 client();

View File

@ -19,10 +19,13 @@
package org.elasticsearch.cloud.aws;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.*;
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.elasticsearch.ElasticsearchException;
@ -36,6 +39,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import java.util.Locale;
import java.util.Random;
/**
*
@ -103,6 +107,24 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
}
}
// Increase the number of retries in case of 5xx API responses
final Random rand = new Random();
RetryPolicy retryPolicy = new RetryPolicy(
RetryPolicy.RetryCondition.NO_RETRY_CONDITION,
new RetryPolicy.BackoffStrategy() {
@Override
public long delayBeforeNextRetry(AmazonWebServiceRequest originalRequest,
AmazonClientException exception,
int retriesAttempted) {
// with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000)
logger.warn("EC2 API request failed, retry again. Reason was:", exception);
return 1000L * (long) (10d * Math.pow(2, ((double) retriesAttempted) / 2.0d) * (1.0d + rand.nextDouble()));
}
},
10,
false);
clientConfiguration.setRetryPolicy(retryPolicy);
AWSCredentialsProvider credentials;
if (account == null && key == null) {

View File

@ -31,6 +31,8 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.transport.TransportService;
@ -64,6 +66,8 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
private final HostType hostType;
private final DiscoNodesCache discoNodes;
@Inject
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service, Version version) {
super(settings);
@ -74,6 +78,9 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip")
.toUpperCase(Locale.ROOT));
this.discoNodes = new DiscoNodesCache(this.settings.getAsTime(DISCOVERY_EC2.NODE_CACHE_TIME,
TimeValue.timeValueMillis(10_000L)));
this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true);
this.groups = new HashSet<>();
groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS)));
@ -94,6 +101,11 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
@Override
public List<DiscoveryNode> buildDynamicNodes() {
return discoNodes.getOrRefresh();
}
protected List<DiscoveryNode> fetchDynamicNodes() {
List<DiscoveryNode> discoNodes = new ArrayList<>();
DescribeInstancesResult descInstances;
@ -199,4 +211,25 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
return describeInstancesRequest;
}
private final class DiscoNodesCache extends SingleObjectCache<List<DiscoveryNode>> {
private boolean empty = true;
protected DiscoNodesCache(TimeValue refreshInterval) {
super(refreshInterval, new ArrayList<>());
}
@Override
protected boolean needsRefresh() {
return (empty || super.needsRefresh());
}
@Override
protected List<DiscoveryNode> refresh() {
List<DiscoveryNode> nodes = fetchDynamicNodes();
empty = nodes.isEmpty();
return nodes;
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.AfterClass;
import org.junit.Before;
@ -231,4 +232,47 @@ public class Ec2DiscoveryTests extends ESTestCase {
assertThat(discoveryNodes, hasSize(prodInstances));
}
abstract class DummyEc2HostProvider extends AwsEc2UnicastHostsProvider {
public int fetchCount = 0;
public DummyEc2HostProvider(Settings settings, TransportService transportService, AwsEc2Service service, Version version) {
super(settings, transportService, service, version);
}
}
public void testGetNodeListEmptyCache() throws Exception {
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service, Version.CURRENT) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
fetchCount++;
return new ArrayList<>();
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
}
assertEquals(provider.fetchCount, is(3));
}
public void testGetNodeListCached() throws Exception {
Settings.Builder builder = Settings.settingsBuilder()
.put(DISCOVERY_EC2.NODE_CACHE_TIME, "500ms");
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, awsEc2Service, Version.CURRENT) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
fetchCount++;
return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1);
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
}
assertEquals(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
}
assertEquals(provider.fetchCount, is(2));
}
}