Plugins: Remove CustomNodeAttributes extension point

The DiscoveryNodeService exists to register CustomNodeAttributes which
plugins can add. This is not necessary, since plugins can already add
additional attributes, and use the node attributes prefix.

This change removes the DiscoveryNodeService, and converts the only
consumer, the ec2 discovery plugin, to add the ec2 availability zone
in additionalSettings().
This commit is contained in:
Ryan Ernst 2016-07-08 21:37:37 -07:00
parent dea00a0b16
commit 2b9d4bdf85
10 changed files with 88 additions and 273 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -146,7 +145,6 @@ public class ClusterModule extends AbstractModule {
bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
bind(GatewayAllocator.class).asEagerSingleton();
bind(AllocationService.class).asEagerSingleton();
bind(DiscoveryNodeService.class).asEagerSingleton();
bind(ClusterService.class).toInstance(clusterService);
bind(NodeConnectionsService.class).asEagerSingleton();
bind(MetaDataCreateIndexService.class).asEagerSingleton();

View File

@ -35,9 +35,11 @@ import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.elasticsearch.common.transport.TransportAddressSerializers.addressToStream;
@ -187,6 +189,24 @@ public class DiscoveryNode implements Writeable, ToXContent {
this.roles = Collections.unmodifiableSet(rolesSet);
}
/** Creates a DiscoveryNode representing the local node. */
public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeIdSupplier) {
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(settings).getAsMap());
Set<DiscoveryNode.Role> roles = new HashSet<>();
if (Node.NODE_INGEST_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.INGEST);
}
if (Node.NODE_MASTER_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.MASTER);
}
if (Node.NODE_DATA_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.DATA);
}
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeIdSupplier, publishAddress,
attributes, roles, Version.CURRENT);
}
/**
* Creates a new {@link DiscoveryNode} by reading from the stream provided as argument
* @param in the stream

View File

@ -1,88 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.node;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.node.Node;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;
/**
*/
public class DiscoveryNodeService extends AbstractComponent {
private final List<CustomAttributesProvider> customAttributesProviders = new CopyOnWriteArrayList<>();
@Inject
public DiscoveryNodeService(Settings settings) {
super(settings);
}
public DiscoveryNodeService addCustomAttributeProvider(CustomAttributesProvider customAttributesProvider) {
customAttributesProviders.add(customAttributesProvider);
return this;
}
public DiscoveryNode buildLocalNode(TransportAddress publishAddress, Supplier<String> nodeIdSupplier) {
Map<String, String> attributes = new HashMap<>(Node.NODE_ATTRIBUTES.get(this.settings).getAsMap());
Set<DiscoveryNode.Role> roles = new HashSet<>();
if (Node.NODE_INGEST_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.INGEST);
}
if (Node.NODE_MASTER_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.MASTER);
}
if (Node.NODE_DATA_SETTING.get(settings)) {
roles.add(DiscoveryNode.Role.DATA);
}
for (CustomAttributesProvider provider : customAttributesProviders) {
try {
Map<String, String> customAttributes = provider.buildAttributes();
if (customAttributes != null) {
for (Map.Entry<String, String> entry : customAttributes.entrySet()) {
if (!attributes.containsKey(entry.getKey())) {
attributes.put(entry.getKey(), entry.getValue());
}
}
}
} catch (Exception e) {
logger.warn("failed to build custom attributes from provider [{}]", e, provider);
}
}
return new DiscoveryNode(Node.NODE_NAME_SETTING.get(settings), nodeIdSupplier.get(), publishAddress, attributes, roles,
Version.CURRENT);
}
public interface CustomAttributesProvider {
Map<String, String> buildAttributes();
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
@ -415,8 +414,8 @@ public class Node implements Closeable {
validateNodeBeforeAcceptingRequests(settings, transportService.boundAddress());
DiscoveryNode localNode = injector.getInstance(DiscoveryNodeService.class)
.buildLocalNode(transportService.boundAddress().publishAddress(), injector.getInstance(NodeEnvironment.class)::nodeId);
DiscoveryNode localNode = DiscoveryNode.createLocal(settings,
transportService.boundAddress().publishAddress(), injector.getInstance(NodeEnvironment.class).nodeId());
// TODO: need to find a cleaner way to start/construct a service with some initial parameters,
// playing nice with the life cycle interfaces

View File

@ -1,85 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.node;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ESTestCase;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
public class DiscoveryNodeServiceTests extends ESTestCase {
public void testBuildLocalNode() {
Map<String, String> expectedAttributes = new HashMap<>();
int numCustomSettings = randomIntBetween(0, 5);
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numCustomSettings; i++) {
builder.put("node.attr.attr" + i, "value" + i);
expectedAttributes.put("attr" + i, "value" + i);
}
Set<DiscoveryNode.Role> selectedRoles = new HashSet<>();
for (DiscoveryNode.Role role : DiscoveryNode.Role.values()) {
if (randomBoolean()) {
//test default true for every role
selectedRoles.add(role);
} else {
boolean isRoleEnabled = randomBoolean();
builder.put("node." + role.getRoleName(), isRoleEnabled);
if (isRoleEnabled) {
selectedRoles.add(role);
}
}
}
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build());
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(LocalTransportAddress.buildUnique(),
() -> UUIDs.randomBase64UUID(random()));
assertThat(discoveryNode.getRoles(), equalTo(selectedRoles));
assertThat(discoveryNode.getAttributes(), equalTo(expectedAttributes));
}
public void testBuildAttributesWithCustomAttributeServiceProvider() {
Map<String, String> expectedAttributes = new HashMap<>();
int numCustomSettings = randomIntBetween(0, 5);
Settings.Builder builder = Settings.builder();
for (int i = 0; i < numCustomSettings; i++) {
builder.put("node.attr.attr" + i, "value" + i);
expectedAttributes.put("attr" + i, "value" + i);
}
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(builder.build());
int numCustomAttributes = randomIntBetween(0, 5);
Map<String, String> customAttributes = new HashMap<>();
for (int i = 0; i < numCustomAttributes; i++) {
customAttributes.put("custom-" + randomAsciiOfLengthBetween(5, 10), randomAsciiOfLengthBetween(1, 10));
}
expectedAttributes.putAll(customAttributes);
discoveryNodeService.addCustomAttributeProvider(() -> customAttributes);
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(LocalTransportAddress.buildUnique(),
() -> UUIDs.randomBase64UUID(random()));
assertThat(discoveryNode.getAttributes(), equalTo(expectedAttributes));
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
@ -161,9 +160,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
.build();
MockTransportService service = buildTransportService(settings);
DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings);
DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress(),
() -> NodeEnvironment.generateNodeId(settings));
DiscoveryNode discoveryNode = DiscoveryNode.createLocal(settings, service.boundAddress().publishAddress(),
NodeEnvironment.generateNodeId(settings));
MockNode node = new MockNode(discoveryNode, service, listener, logger);
node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node);
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1);

View File

@ -35,8 +35,6 @@ import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.cloud.aws.node.Ec2CustomNodeAttributes;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -56,11 +54,10 @@ public class AwsEc2ServiceImpl extends AbstractLifecycleComponent implements Aws
private AmazonEC2Client client;
@Inject
public AwsEc2ServiceImpl(Settings settings, NetworkService networkService, DiscoveryNodeService discoveryNodeService) {
public AwsEc2ServiceImpl(Settings settings, NetworkService networkService) {
super(settings);
// add specific ec2 name resolver
networkService.addCustomNameResolver(new Ec2NameResolver(settings));
discoveryNodeService.addCustomAttributeProvider(new Ec2CustomNodeAttributes(settings));
}
@Override

View File

@ -1,78 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.node;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class Ec2CustomNodeAttributes extends AbstractComponent implements DiscoveryNodeService.CustomAttributesProvider {
public Ec2CustomNodeAttributes(Settings settings) {
super(settings);
}
@Override
public Map<String, String> buildAttributes() {
if (AwsEc2Service.AUTO_ATTRIBUTE_SETTING.get(settings) == false) {
return null;
}
Map<String, String> ec2Attributes = new HashMap<>();
URLConnection urlConnection;
InputStream in = null;
try {
URL url = new URL(AwsEc2ServiceImpl.EC2_METADATA_URL + "placement/availability-zone");
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
urlConnection = url.openConnection();
urlConnection.setConnectTimeout(2000);
in = urlConnection.getInputStream();
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String metadataResult = urlReader.readLine();
if (metadataResult == null || metadataResult.length() == 0) {
logger.error("no ec2 metadata returned from {}", url);
return null;
}
ec2Attributes.put("aws_availability_zone", metadataResult);
} catch (IOException e) {
logger.debug("failed to get metadata for [placement/availability-zone]", e);
} finally {
IOUtils.closeWhileHandlingException(in);
}
return ec2Attributes;
}
}

View File

@ -19,6 +19,21 @@
package org.elasticsearch.plugin.discovery.ec2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
@ -29,24 +44,19 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
*
*/
public class Ec2DiscoveryPlugin extends Plugin {
private static ESLogger logger = Loggers.getLogger(Ec2DiscoveryPlugin.class);
public static final String EC2 = "ec2";
// ClientConfiguration clinit has some classloader problems
@ -69,6 +79,12 @@ public class Ec2DiscoveryPlugin extends Plugin {
});
}
private Settings settings;
public Ec2DiscoveryPlugin(Settings settings) {
this.settings = settings;
}
@Override
public Collection<Module> nodeModules() {
Collection<Module> modules = new ArrayList<>();
@ -123,4 +139,41 @@ public class Ec2DiscoveryPlugin extends Plugin {
// Register cloud node settings: cloud.node
AwsEc2Service.AUTO_ATTRIBUTE_SETTING);
}
/** Adds a node attribute for the ec2 availability zone. */
@Override
public Settings additionalSettings() {
if (AwsEc2Service.AUTO_ATTRIBUTE_SETTING.get(settings) == false) {
return Settings.EMPTY;
}
Settings.Builder attrs = Settings.builder();
final URL url;
final URLConnection urlConnection;
try {
url = new URL(AwsEc2ServiceImpl.EC2_METADATA_URL + "placement/availability-zone");
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
urlConnection = url.openConnection();
urlConnection.setConnectTimeout(2000);
} catch (IOException e) {
// should not happen, we know the url is not malformed, and openConnection does not actually hit network
throw new UncheckedIOException(e);
}
try (InputStream in = urlConnection.getInputStream();
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
String metadataResult = urlReader.readLine();
if (metadataResult == null || metadataResult.length() == 0) {
throw new IOException("no ec2 metadata returned from " + url);
} else {
attrs.put(Node.NODE_ATTRIBUTES.getKey() + "aws_availability_zone", metadataResult);
}
} catch (IOException e) {
// this is lenient so the plugin does not fail when installed outside of ec2
logger.error("failed to get metadata for [placement/availability-zone]", e);
}
return attrs.build();
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.aws;
import com.amazonaws.ClientConfiguration;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.discovery.ec2.Ec2DiscoveryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.junit.BeforeClass;
@ -34,7 +35,7 @@ public class AWSSignersTests extends ESTestCase {
*/
@BeforeClass
public static void instantiatePlugin() {
new Ec2DiscoveryPlugin();
new Ec2DiscoveryPlugin(Settings.EMPTY);
}
public void testSigners() {