Merge branch 'chaudum-ec2-improve-disco-nodes'
This commit is contained in:
commit
999d5ab7a0
|
@ -165,6 +165,11 @@ The following are a list of settings (prefixed with `discovery.ec2`) that can fu
|
||||||
Defaults to `3s`. If no unit like `ms`, `s` or `m` is specified,
|
Defaults to `3s`. If no unit like `ms`, `s` or `m` is specified,
|
||||||
milliseconds are used.
|
milliseconds are used.
|
||||||
|
|
||||||
|
`node_cache_time`::
|
||||||
|
|
||||||
|
How long the list of hosts is cached to prevent further requests to the AWS API.
|
||||||
|
Defaults to `10s`.
|
||||||
|
|
||||||
|
|
||||||
[IMPORTANT]
|
[IMPORTANT]
|
||||||
.Binding the network host
|
.Binding the network host
|
||||||
|
@ -195,7 +200,6 @@ as valid network host settings:
|
||||||
|`_ec2_` |equivalent to _ec2:privateIpv4_.
|
|`_ec2_` |equivalent to _ec2:privateIpv4_.
|
||||||
|==================================================================
|
|==================================================================
|
||||||
|
|
||||||
|
|
||||||
[[discovery-ec2-permissions]]
|
[[discovery-ec2-permissions]]
|
||||||
===== Recommended EC2 Permissions
|
===== Recommended EC2 Permissions
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ public interface AwsEc2Service extends LifecycleComponent<AwsEc2Service> {
|
||||||
public static final String GROUPS = "discovery.ec2.groups";
|
public static final String GROUPS = "discovery.ec2.groups";
|
||||||
public static final String TAG_PREFIX = "discovery.ec2.tag.";
|
public static final String TAG_PREFIX = "discovery.ec2.tag.";
|
||||||
public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones";
|
public static final String AVAILABILITY_ZONES = "discovery.ec2.availability_zones";
|
||||||
|
public static final String NODE_CACHE_TIME = "discovery.ec2.node_cache_time";
|
||||||
}
|
}
|
||||||
|
|
||||||
AmazonEC2 client();
|
AmazonEC2 client();
|
||||||
|
|
|
@ -19,10 +19,13 @@
|
||||||
|
|
||||||
package org.elasticsearch.cloud.aws;
|
package org.elasticsearch.cloud.aws;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonClientException;
|
||||||
|
import com.amazonaws.AmazonWebServiceRequest;
|
||||||
import com.amazonaws.ClientConfiguration;
|
import com.amazonaws.ClientConfiguration;
|
||||||
import com.amazonaws.Protocol;
|
import com.amazonaws.Protocol;
|
||||||
import com.amazonaws.auth.*;
|
import com.amazonaws.auth.*;
|
||||||
import com.amazonaws.internal.StaticCredentialsProvider;
|
import com.amazonaws.internal.StaticCredentialsProvider;
|
||||||
|
import com.amazonaws.retry.RetryPolicy;
|
||||||
import com.amazonaws.services.ec2.AmazonEC2;
|
import com.amazonaws.services.ec2.AmazonEC2;
|
||||||
import com.amazonaws.services.ec2.AmazonEC2Client;
|
import com.amazonaws.services.ec2.AmazonEC2Client;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
@ -36,6 +39,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.settings.SettingsFilter;
|
import org.elasticsearch.common.settings.SettingsFilter;
|
||||||
|
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -103,6 +107,24 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent<AwsEc2Service>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Increase the number of retries in case of 5xx API responses
|
||||||
|
final Random rand = new Random();
|
||||||
|
RetryPolicy retryPolicy = new RetryPolicy(
|
||||||
|
RetryPolicy.RetryCondition.NO_RETRY_CONDITION,
|
||||||
|
new RetryPolicy.BackoffStrategy() {
|
||||||
|
@Override
|
||||||
|
public long delayBeforeNextRetry(AmazonWebServiceRequest originalRequest,
|
||||||
|
AmazonClientException exception,
|
||||||
|
int retriesAttempted) {
|
||||||
|
// with 10 retries the max delay time is 320s/320000ms (10 * 2^5 * 1 * 1000)
|
||||||
|
logger.warn("EC2 API request failed, retry again. Reason was:", exception);
|
||||||
|
return 1000L * (long) (10d * Math.pow(2, ((double) retriesAttempted) / 2.0d) * (1.0d + rand.nextDouble()));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
10,
|
||||||
|
false);
|
||||||
|
clientConfiguration.setRetryPolicy(retryPolicy);
|
||||||
|
|
||||||
AWSCredentialsProvider credentials;
|
AWSCredentialsProvider credentials;
|
||||||
|
|
||||||
if (account == null && key == null) {
|
if (account == null && key == null) {
|
||||||
|
|
|
@ -31,6 +31,8 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.SingleObjectCache;
|
||||||
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
@ -64,6 +66,8 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||||
|
|
||||||
private final HostType hostType;
|
private final HostType hostType;
|
||||||
|
|
||||||
|
private final DiscoNodesCache discoNodes;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service, Version version) {
|
public AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service, Version version) {
|
||||||
super(settings);
|
super(settings);
|
||||||
|
@ -74,6 +78,9 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||||
this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip")
|
this.hostType = HostType.valueOf(settings.get(DISCOVERY_EC2.HOST_TYPE, "private_ip")
|
||||||
.toUpperCase(Locale.ROOT));
|
.toUpperCase(Locale.ROOT));
|
||||||
|
|
||||||
|
this.discoNodes = new DiscoNodesCache(this.settings.getAsTime(DISCOVERY_EC2.NODE_CACHE_TIME,
|
||||||
|
TimeValue.timeValueMillis(10_000L)));
|
||||||
|
|
||||||
this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true);
|
this.bindAnyGroup = settings.getAsBoolean(DISCOVERY_EC2.ANY_GROUP, true);
|
||||||
this.groups = new HashSet<>();
|
this.groups = new HashSet<>();
|
||||||
groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS)));
|
groups.addAll(Arrays.asList(settings.getAsArray(DISCOVERY_EC2.GROUPS)));
|
||||||
|
@ -94,6 +101,11 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<DiscoveryNode> buildDynamicNodes() {
|
public List<DiscoveryNode> buildDynamicNodes() {
|
||||||
|
return discoNodes.getOrRefresh();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<DiscoveryNode> fetchDynamicNodes() {
|
||||||
|
|
||||||
List<DiscoveryNode> discoNodes = new ArrayList<>();
|
List<DiscoveryNode> discoNodes = new ArrayList<>();
|
||||||
|
|
||||||
DescribeInstancesResult descInstances;
|
DescribeInstancesResult descInstances;
|
||||||
|
@ -199,4 +211,25 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
|
||||||
|
|
||||||
return describeInstancesRequest;
|
return describeInstancesRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class DiscoNodesCache extends SingleObjectCache<List<DiscoveryNode>> {
|
||||||
|
|
||||||
|
private boolean empty = true;
|
||||||
|
|
||||||
|
protected DiscoNodesCache(TimeValue refreshInterval) {
|
||||||
|
super(refreshInterval, new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean needsRefresh() {
|
||||||
|
return (empty || super.needsRefresh());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<DiscoveryNode> refresh() {
|
||||||
|
List<DiscoveryNode> nodes = fetchDynamicNodes();
|
||||||
|
empty = nodes.isEmpty();
|
||||||
|
return nodes;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.transport.MockTransportService;
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.elasticsearch.transport.local.LocalTransport;
|
import org.elasticsearch.transport.local.LocalTransport;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -231,4 +232,47 @@ public class Ec2DiscoveryTests extends ESTestCase {
|
||||||
assertThat(discoveryNodes, hasSize(prodInstances));
|
assertThat(discoveryNodes, hasSize(prodInstances));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract class DummyEc2HostProvider extends AwsEc2UnicastHostsProvider {
|
||||||
|
public int fetchCount = 0;
|
||||||
|
public DummyEc2HostProvider(Settings settings, TransportService transportService, AwsEc2Service service, Version version) {
|
||||||
|
super(settings, transportService, service, version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGetNodeListEmptyCache() throws Exception {
|
||||||
|
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
|
||||||
|
DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service, Version.CURRENT) {
|
||||||
|
@Override
|
||||||
|
protected List<DiscoveryNode> fetchDynamicNodes() {
|
||||||
|
fetchCount++;
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for (int i=0; i<3; i++) {
|
||||||
|
provider.buildDynamicNodes();
|
||||||
|
}
|
||||||
|
assertEquals(provider.fetchCount, is(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGetNodeListCached() throws Exception {
|
||||||
|
Settings.Builder builder = Settings.settingsBuilder()
|
||||||
|
.put(DISCOVERY_EC2.NODE_CACHE_TIME, "500ms");
|
||||||
|
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
|
||||||
|
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, awsEc2Service, Version.CURRENT) {
|
||||||
|
@Override
|
||||||
|
protected List<DiscoveryNode> fetchDynamicNodes() {
|
||||||
|
fetchCount++;
|
||||||
|
return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for (int i=0; i<3; i++) {
|
||||||
|
provider.buildDynamicNodes();
|
||||||
|
}
|
||||||
|
assertEquals(provider.fetchCount, is(1));
|
||||||
|
Thread.sleep(1_000L); // wait for cache to expire
|
||||||
|
for (int i=0; i<3; i++) {
|
||||||
|
provider.buildDynamicNodes();
|
||||||
|
}
|
||||||
|
assertEquals(provider.fetchCount, is(2));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue