Merge pull request #19348 from rjernst/deguice_attrs

Remove CustomNodeAttributes extension point
This commit is contained in:
Ryan Ernst 2016-07-09 12:46:26 -07:00 committed by GitHub
commit 3832825a87
11 changed files with 176 additions and 273 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -146,7 +145,6 @@ public class ClusterModule extends AbstractModule {
bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
bind(GatewayAllocator.class).asEagerSingleton();
bind(AllocationService.class).asEagerSingleton();
bind(DiscoveryNodeService.class).asEagerSingleton();
bind(ClusterService.class).toInstance(clusterService);
bind(NodeConnectionsService.class).asEagerSingleton();
bind(MetaDataCreateIndexService.class).asEagerSingleton();

View File

@ -35,9 +35,11 @@ import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.common.transport.TransportAddressSerializers.addressToStream;
@ -187,6 +189,24 @@ public class DiscoveryNode implements Writeable, ToXContent {
this.roles = Collections.unmodifiableSet(rolesSet);
}
/** Creates a DiscoveryNode representing the local node. */
public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeIdSupplier) {
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(settings).getAsMap());
Set<DiscoveryNode.Role> roles = new HashSet<>();
if (Node.NODE_INGEST_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.INGEST);
}
if (Node.NODE_MASTER_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.MASTER);
}
if (Node.NODE_DATA_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.DATA);
}
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeIdSupplier, publishAddress,
attributes, roles, Version.CURRENT);
}
/**
* Creates a new {@link DiscoveryNode} by reading from the stream provided as argument
* @param in the stream

View File

@ -1,88 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
/**
*/
public class DiscoveryNodeService extends AbstractComponent {
private final List<CustomAttributesProvider> customAttributesProviders = new CopyOnWriteArrayList<>();
@Inject
public DiscoveryNodeService(Settings settings) {
super(settings);
}
public DiscoveryNodeService addCustomAttributeProvider(CustomAttributesProvider customAttributesProvider) {
customAttributesProviders.add(customAttributesProvider);
return this;
}
public DiscoveryNode buildLocalNode(TransportAddress publishAddress, Supplier<String> nodeIdSupplier) {
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(this.settings).getAsMap());
Set<DiscoveryNode.Role> roles = new HashSet<>();
if (Node.NODE_INGEST_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.INGEST);
}
if (Node.NODE_MASTER_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.MASTER);
}
if (Node.NODE_DATA_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.DATA);
}
for (CustomAttributesProvider provider : customAttributesProviders) {
try {
Map<String, String> customAttributes = provider.buildAttributes();
if (customAttributes != null) {
for (Map.Entry<String, String> entry : customAttributes.entrySet()) {
if (!attributes.containsKey(entry.getKey())) {
attributes.put(entry.getKey(), entry.getValue());
}
}
}
} catch (Exception e) {
logger.warn("failed to build custom attributes from provider [{}]", e, provider);
}
}
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeIdSupplier.get(), publishAddress, attributes, roles,
Version.CURRENT);
}
public interface CustomAttributesProvider {
Map<String, String> buildAttributes();
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
@ -415,8 +414,8 @@ public class Node implements Closeable {
validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress());
DiscoveryNode localNode = injector.getInstance(DiscoveryNodeService.class)
.buildLocalNode(transportService.boundAddress().publishAddress(), injector.getInstance(NodeEnvironment.class)::nodeId);
DiscoveryNode localNode = DiscoveryNode.createLocal(settings,
transportService.boundAddress().publishAddress(), injector.getInstance(NodeEnvironment.class).nodeId());
// TODO: need to find a cleaner way to start/construct a service with some initial parameters,
// playing nice with the life cycle interfaces

View File

@ -1,85 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.node;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
public class DiscoveryNodeServiceTests extends ESTestCase {
public void testBuildLocalNode() {
Map<String, String> expectedAttributes = new HashMap<>();
int numCustomSettings = randomIntBetween(0, 5);
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numCustomSettings; i++) {
builder.put("node.attr.attr" + i, "value" + i);
expectedAttributes.put("attr" + i, "value" + i);
}
Set<DiscoveryNode.Role> selectedRoles = new HashSet<>();
for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) {
if (randomBoolean()) {
//test default true for every role
selectedRoles.add(role);
} else {
boolean isRoleEnabled = randomBoolean();
builder.put("node." + role.getRoleName(), isRoleEnabled);
if (isRoleEnabled) {
selectedRoles.add(role);
}
}
}
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build());
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(LocalTransportAddress.buildUnique(),
() -> UUIDs.randomBase64UUID(random()));
assertThat(discoveryNode.getRoles(), equalTo(selectedRoles));
assertThat(discoveryNode.getAttributes(), equalTo(expectedAttributes));
}
public void testBuildAttributesWithCustomAttributeServiceProvider() {
Map<String, String> expectedAttributes = new HashMap<>();
int numCustomSettings = randomIntBetween(0, 5);
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numCustomSettings; i++) {
builder.put("node.attr.attr" + i, "value" + i);
expectedAttributes.put("attr" + i, "value" + i);
}
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build());
int numCustomAttributes = randomIntBetween(0, 5);
Map<String, String> customAttributes = new HashMap<>();
for (int i = 0; i < numCustomAttributes; i++) {
customAttributes.put("custom-" + randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(1, 10));
}
expectedAttributes.putAll(customAttributes);
discoveryNodeService.addCustomAttributeProvider(() -> customAttributes);
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(LocalTransportAddress.buildUnique(),
() -> UUIDs.randomBase64UUID(random()));
assertThat(discoveryNode.getAttributes(), equalTo(expectedAttributes));
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
@ -161,9 +160,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
.build();
MockTransportService service = buildTransportService(settings);
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings);
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress(),
() -> NodeEnvironment.generateNodeId(settings));
DiscoveryNode discoveryNode = DiscoveryNode.createLocal(settings, service.boundAddress().publishAddress(),
NodeEnvironment.generateNodeId(settings));
MockNode node = new MockNode(discoveryNode, service, listener, logger);
node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node);
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1);

View File

@ -35,8 +35,6 @@ import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -56,11 +54,10 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent implements Aws
private AmazonEC2Client client;
@Inject
public AwsEc2ServiceImpl(Settings settings, NetworkService networkService, DiscoveryNodeService discoveryNodeService) {
public AwsEc2ServiceImpl(Settings settings, NetworkService networkService) {
super(settings);
// add specific ec2 name resolver
networkService.addCustomNameResolver(new Ec2NameResolver(settings));
discoveryNodeService.addCustomAttributeProvider(new Ec2CustomNodeAttributes(settings));
}
@Override

View File

@ -1,78 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.node;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class Ec2CustomNodeAttributes extends AbstractComponent implements DiscoveryNodeService.CustomAttributesProvider {
public Ec2CustomNodeAttributes(Settings settings) {
super(settings);
}
@Override
public Map<String, String> buildAttributes() {
if (AwsEc2Service.AUTO_ATTRIBUTE_SETTING.get(settings) == false) {
return null;
}
Map<String, String> ec2Attributes = new HashMap<>();
URLConnection urlConnection;
InputStream in = null;
try {
URL url = new URL(AwsEc2ServiceImpl.EC2_METADATA_URL + "placement/availability-zone");
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
urlConnection = url.openConnection();
urlConnection.setConnectTimeout(2000);
in = urlConnection.getInputStream();
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String metadataResult = urlReader.readLine();
if (metadataResult == null || metadataResult.length() == 0) {
logger.error("no ec2 metadata returned from {}", url);
return null;
}
ec2Attributes.put("aws_availability_zone", metadataResult);
} catch (IOException e) {
logger.debug("failed to get metadata for [placement/availability-zone]", e);
} finally {
IOUtils.closeWhileHandlingException(in);
}
return ec2Attributes;
}
}

View File

@ -19,6 +19,21 @@
package org.elasticsearch.plugin.discovery.ec2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
@ -29,24 +44,19 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
*
*/
public class Ec2DiscoveryPlugin extends Plugin {
private static ESLogger logger = Loggers.getLogger(Ec2DiscoveryPlugin.class);
public static final String EC2 = "ec2";
// ClientConfiguration clinit has some classloader problems
@ -69,6 +79,12 @@ public class Ec2DiscoveryPlugin extends Plugin {
});
}
private Settings settings;
public Ec2DiscoveryPlugin(Settings settings) {
this.settings = settings;
}
@Override
public Collection<Module> nodeModules() {
Collection<Module> modules = new ArrayList<>();
@ -123,4 +139,46 @@ public class Ec2DiscoveryPlugin extends Plugin {
// Register cloud node settings: cloud.node
AwsEc2Service.AUTO_ATTRIBUTE_SETTING);
}
/** Adds a node attribute for the ec2 availability zone. */
@Override
public Settings additionalSettings() {
return getAvailabilityZoneNodeAttributes(settings, AwsEc2ServiceImpl.EC2_METADATA_URL + "placement/availability-zone");
}
// pkg private for testing
static Settings getAvailabilityZoneNodeAttributes(Settings settings, String azMetadataUrl) {
if (AwsEc2Service.AUTO_ATTRIBUTE_SETTING.get(settings) == false) {
return Settings.EMPTY;
}
Settings.Builder attrs = Settings.builder();
final URL url;
final URLConnection urlConnection;
try {
url = new URL(azMetadataUrl);
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
urlConnection = url.openConnection();
urlConnection.setConnectTimeout(2000);
} catch (IOException e) {
// should not happen, we know the url is not malformed, and openConnection does not actually hit network
throw new UncheckedIOException(e);
}
try (InputStream in = urlConnection.getInputStream();
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
String metadataResult = urlReader.readLine();
if (metadataResult == null || metadataResult.length() == 0) {
throw new IllegalStateException("no ec2 metadata returned from " + url);
} else {
attrs.put(Node.NODE_ATTRIBUTES.getKey() + "aws_availability_zone", metadataResult);
}
} catch (IOException e) {
// this is lenient so the plugin does not fail when installed outside of ec2
logger.error("failed to get metadata for [placement/availability-zone]", e);
}
return attrs.build();
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.junit.BeforeClass;
@ -34,7 +35,7 @@ public class AWSSignersTests extends ESTestCase {
*/
@BeforeClass
public static void instantiatePlugin() {
new Ec2DiscoveryPlugin();
new Ec2DiscoveryPlugin(Settings.EMPTY);
}
public void testSigners() {

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.discovery.ec2;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
public class Ec2DiscoveryPluginTests extends ESTestCase {
private Settings getNodeAttributes(Settings settings, String url) {
Settings realSettings = Settings.builder()
.put(AwsEc2Service.AUTO_ATTRIBUTE_SETTING.getKey(), true)
.put(settings).build();
return Ec2DiscoveryPlugin.getAvailabilityZoneNodeAttributes(realSettings, url);
}
private void assertNodeAttributes(Settings settings, String url, String expected) {
Settings additional = getNodeAttributes(settings, url);
if (expected == null) {
assertTrue(additional.isEmpty());
} else {
assertEquals(expected, additional.get(Node.NODE_ATTRIBUTES.getKey() + "aws_availability_zone"));
}
}
public void testNodeAttributesDisabled() {
Settings settings = Settings.builder()
.put(AwsEc2Service.AUTO_ATTRIBUTE_SETTING.getKey(), false).build();
assertNodeAttributes(settings, "bogus", null);
}
public void testNodeAttributes() throws Exception {
Path zoneUrl = createTempFile();
Files.write(zoneUrl, Arrays.asList("us-east-1c"));
assertNodeAttributes(Settings.EMPTY, zoneUrl.toUri().toURL().toString(), "us-east-1c");
}
public void testNodeAttributesBogusUrl() {
UncheckedIOException e = expectThrows(UncheckedIOException.class, () ->
getNodeAttributes(Settings.EMPTY, "bogus")
);
assertNotNull(e.getCause());
String msg = e.getCause().getMessage();
assertTrue(msg, msg.contains("no protocol: bogus"));
}
public void testNodeAttributesEmpty() throws Exception {
Path zoneUrl = createTempFile();
IllegalStateException e = expectThrows(IllegalStateException.class, () ->
getNodeAttributes(Settings.EMPTY, zoneUrl.toUri().toURL().toString())
);
assertTrue(e.getMessage(), e.getMessage().contains("no ec2 metadata returned"));
}
public void testNodeAttributesErrorLenient() throws Exception {
Path dne = createTempDir().resolve("dne");
assertNodeAttributes(Settings.EMPTY, dne.toUri().toURL().toString(), null);
}
}