Don't allow nodes with missing custom data types to join cluster

This commit is contained in:
Areek Zillur 2015-12-11 18:33:59 -05:00
parent 3a95516b79
commit 9d9b557cea
7 changed files with 196 additions and 105 deletions

View File

@ -129,7 +129,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
T proto = (T)customPrototypes.get(type); T proto = (T)customPrototypes.get(type);
if (proto == null) { if (proto == null) {
throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "]"); throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "], node likely missing plugins");
} }
return proto; return proto;
} }

View File

@ -134,7 +134,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, Fr
//noinspection unchecked //noinspection unchecked
T proto = (T) customPrototypes.get(type); T proto = (T) customPrototypes.get(type);
if (proto == null) { if (proto == null) {
throw new IllegalArgumentException("No custom metadata prototype registered for type [" + type + "]"); throw new IllegalArgumentException("No custom metadata prototype registered for type [" + type + "], node likely missing plugins");
} }
return proto; return proto;
} }

View File

@ -836,8 +836,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} }
} }
void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) {
if (!transportService.addressSupported(node.address().getClass())) { if (!transportService.addressSupported(node.address().getClass())) {
// TODO, what should we do now? Maybe inform that node that its crap? // TODO, what should we do now? Maybe inform that node that its crap?
logger.warn("received a wrong address type from [{}], ignoring...", node); logger.warn("received a wrong address type from [{}], ignoring...", node);
@ -859,7 +858,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// validate the join request, will throw a failure if it fails, which will get back to the // validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request // node calling the join request
membership.sendValidateJoinRequestBlocking(node, joinTimeout); try {
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
} catch (Throwable e) {
logger.warn("failed to validate incoming join request from node [{}]", node);
callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
return;
}
nodeJoinController.handleJoinRequest(node, callback); nodeJoinController.handleJoinRequest(node, callback);
} }
} }
@ -1039,7 +1044,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private class MembershipListener implements MembershipAction.MembershipListener { private class MembershipListener implements MembershipAction.MembershipListener {
@Override @Override
public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) { public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
handleJoinRequest(node, callback); handleJoinRequest(node, clusterService.state(), callback);
} }
@Override @Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.zen.membership; package org.elasticsearch.discovery.zen.membership;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -88,10 +89,6 @@ public class MembershipAction extends AbstractComponent {
transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS); transportService.submitRequest(masterNode, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
} }
public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME);
}
public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) { public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {
transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME) transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node), EmptyTransportResponseHandler.INSTANCE_SAME)
.txGet(timeout.millis(), TimeUnit.MILLISECONDS); .txGet(timeout.millis(), TimeUnit.MILLISECONDS);
@ -100,8 +97,8 @@ public class MembershipAction extends AbstractComponent {
/** /**
* Validates the join request, throwing a failure if it failed. * Validates the join request, throwing a failure if it failed.
*/ */
public void sendValidateJoinRequestBlocking(DiscoveryNode node, TimeValue timeout) { public void sendValidateJoinRequestBlocking(DiscoveryNode node, ClusterState state, TimeValue timeout) {
transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(), EmptyTransportResponseHandler.INSTANCE_SAME) transportService.submitRequest(node, DISCOVERY_JOIN_VALIDATE_ACTION_NAME, new ValidateJoinRequest(state), EmptyTransportResponseHandler.INSTANCE_SAME)
.txGet(timeout.millis(), TimeUnit.MILLISECONDS); .txGet(timeout.millis(), TimeUnit.MILLISECONDS);
} }
@ -156,9 +153,26 @@ public class MembershipAction extends AbstractComponent {
} }
} }
public static class ValidateJoinRequest extends TransportRequest { class ValidateJoinRequest extends TransportRequest {
private ClusterState state;
public ValidateJoinRequest() { ValidateJoinRequest() {
}
ValidateJoinRequest(ClusterState state) {
this.state = state;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.state.writeTo(out);
} }
} }

View File

@ -24,10 +24,8 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -45,6 +43,7 @@ import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction; import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.TestCustomMetaData;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.BytesTransportRequest;
@ -57,9 +56,7 @@ import org.hamcrest.Matchers;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -228,16 +225,69 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master than the current one, rejecting")); assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master than the current one, rejecting"));
} }
public void testHandleNodeJoin_incompatibleClusterState() throws UnknownHostException {
Settings nodeSettings = Settings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.build();
String masterOnlyNode = internalCluster().startMasterOnlyNode(nodeSettings);
String node1 = internalCluster().startNode(nodeSettings);
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, masterOnlyNode);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);
final ClusterState state = clusterService.state();
MetaData.Builder mdBuilder = MetaData.builder(state.metaData());
mdBuilder.putCustom(CustomMetaData.TYPE, new CustomMetaData("data"));
ClusterState stateWithCustomMetaData = ClusterState.builder(state).metaData(mdBuilder).build();
final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
DiscoveryNode node = state.nodes().localNode();
zenDiscovery.handleJoinRequest(node, stateWithCustomMetaData, new MembershipAction.JoinCallback() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(Throwable t) {
holder.set((IllegalStateException) t);
}
});
assertThat(holder.get(), notNullValue());
assertThat(holder.get().getMessage(), equalTo("failure when sending a validation request to node"));
}
public static class CustomMetaData extends TestCustomMetaData {
public static final String TYPE = "custom_md";
CustomMetaData(String data) {
super(data);
}
@Override
protected TestCustomMetaData newTestCustomMetaData(String data) {
return new CustomMetaData(data);
}
@Override
public String type() {
return TYPE;
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return EnumSet.of(MetaData.XContentContext.GATEWAY, MetaData.XContentContext.SNAPSHOT);
}
}
public void testHandleNodeJoin_incompatibleMinVersion() throws UnknownHostException { public void testHandleNodeJoin_incompatibleMinVersion() throws UnknownHostException {
Settings nodeSettings = Settings.settingsBuilder() Settings nodeSettings = Settings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally .put("discovery.type", "zen") // <-- To override the local setting if set externally
.build(); .build();
String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0_beta1); String nodeName = internalCluster().startNode(nodeSettings, Version.V_2_0_0_beta1);
ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName); ZenDiscovery zenDiscovery = (ZenDiscovery) internalCluster().getInstance(Discovery.class, nodeName);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName);
DiscoveryNode node = new DiscoveryNode("_node_id", new InetSocketTransportAddress(InetAddress.getByName("0.0.0.0"), 0), Version.V_1_6_0); DiscoveryNode node = new DiscoveryNode("_node_id", new InetSocketTransportAddress(InetAddress.getByName("0.0.0.0"), 0), Version.V_1_6_0);
final AtomicReference<IllegalStateException> holder = new AtomicReference<>(); final AtomicReference<IllegalStateException> holder = new AtomicReference<>();
zenDiscovery.handleJoinRequest(node, new MembershipAction.JoinCallback() { zenDiscovery.handleJoinRequest(node, clusterService.state(), new MembershipAction.JoinCallback() {
@Override @Override
public void onSuccess() { public void onSuccess() {
} }

View File

@ -22,7 +22,6 @@ package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet; import com.carrotsearch.hppc.IntSet;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
@ -33,24 +32,17 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
@ -68,9 +60,9 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCustomMetaData;
import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.FakeRestRequest;
import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -902,78 +894,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
)); ));
} }
public static abstract class TestCustomMetaData extends AbstractDiffable<Custom> implements MetaData.Custom {
private final String data;
protected TestCustomMetaData(String data) {
this.data = data;
}
public String getData() {
return data;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestCustomMetaData that = (TestCustomMetaData) o;
if (!data.equals(that.data)) return false;
return true;
}
@Override
public int hashCode() {
return data.hashCode();
}
protected abstract TestCustomMetaData newTestCustomMetaData(String data);
@Override
public Custom readFrom(StreamInput in) throws IOException {
return newTestCustomMetaData(in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(getData());
}
@Override
public Custom fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
String data = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if ("data".equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, invalid data type");
}
data = parser.text();
} else {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, unknown field [{}]", currentFieldName);
}
} else {
throw new ElasticsearchParseException("failed to parse snapshottable metadata");
}
}
if (data == null) {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, data not found");
}
return newTestCustomMetaData(data);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("data", getData());
return builder;
}
}
static { static {
MetaData.registerPrototype(SnapshottableMetadata.TYPE, SnapshottableMetadata.PROTO); MetaData.registerPrototype(SnapshottableMetadata.TYPE, SnapshottableMetadata.PROTO);

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public abstract class TestCustomMetaData extends AbstractDiffable<MetaData.Custom> implements MetaData.Custom {
private final String data;
protected TestCustomMetaData(String data) {
this.data = data;
}
public String getData() {
return data;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestCustomMetaData that = (TestCustomMetaData) o;
if (!data.equals(that.data)) return false;
return true;
}
@Override
public int hashCode() {
return data.hashCode();
}
protected abstract TestCustomMetaData newTestCustomMetaData(String data);
@Override
public MetaData.Custom readFrom(StreamInput in) throws IOException {
return newTestCustomMetaData(in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(getData());
}
@Override
public MetaData.Custom fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
String data = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
if ("data".equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, invalid data type");
}
data = parser.text();
} else {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, unknown field [{}]", currentFieldName);
}
} else {
throw new ElasticsearchParseException("failed to parse snapshottable metadata");
}
}
if (data == null) {
throw new ElasticsearchParseException("failed to parse snapshottable metadata, data not found");
}
return newTestCustomMetaData(data);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("data", getData());
return builder;
}
}