diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index b5474a05ec2..ad5538f6537 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -27,6 +27,10 @@ import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction; import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction; import org.elasticsearch.action.admin.cluster.bootstrap.TransportBootstrapClusterAction; import org.elasticsearch.action.admin.cluster.bootstrap.TransportGetDiscoveredNodesAction; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingTombstonesAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; @@ -428,6 +432,8 @@ public class ActionModule extends AbstractModule { actions.register(GetDiscoveredNodesAction.INSTANCE, TransportGetDiscoveredNodesAction.class); actions.register(BootstrapClusterAction.INSTANCE, TransportBootstrapClusterAction.class); + actions.register(AddVotingTombstonesAction.INSTANCE, TransportAddVotingTombstonesAction.class); + actions.register(ClearVotingTombstonesAction.INSTANCE, TransportClearVotingTombstonesAction.class); actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesAction.java new file mode 100644 index 00000000000..78650726ecf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable.Reader; + +public class AddVotingTombstonesAction extends Action { + public static final AddVotingTombstonesAction INSTANCE = new AddVotingTombstonesAction(); + public static final String NAME = "cluster:admin/voting/add_tombstones"; + + private AddVotingTombstonesAction() { + super(NAME); + } + + @Override + public AddVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Reader getResponseReader() { + return AddVotingTombstonesResponse::new; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java new file mode 100644 index 00000000000..0ffc3d5567a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequest.java @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A request to add voting tombstones for certain master-eligible nodes, and wait for these nodes to be removed from the voting + * configuration. + */ +public class AddVotingTombstonesRequest extends MasterNodeRequest { + private final String[] nodeDescriptions; + private final TimeValue timeout; + + /** + * Construct a request to add voting tombstones for master-eligible nodes matching the given descriptions, and wait for a default 30 + * seconds for these nodes to be removed from the voting configuration. + * @param nodeDescriptions Descriptions of the nodes to add - see {@link DiscoveryNodes#resolveNodes(String...)} + */ + public AddVotingTombstonesRequest(String[] nodeDescriptions) { + this(nodeDescriptions, TimeValue.timeValueSeconds(30)); + } + + /** + * Construct a request to add voting tombstones for master-eligible nodes matching the given descriptions, and wait for these nodes to + * be removed from the voting configuration. + * @param nodeDescriptions Descriptions of the nodes whose tombstones to add - see {@link DiscoveryNodes#resolveNodes(String...)}. + * @param timeout How long to wait for the nodes to be removed from the voting configuration. + */ + public AddVotingTombstonesRequest(String[] nodeDescriptions, TimeValue timeout) { + if (timeout.compareTo(TimeValue.ZERO) < 0) { + throw new IllegalArgumentException("timeout [" + timeout + "] must be non-negative"); + } + this.nodeDescriptions = nodeDescriptions; + this.timeout = timeout; + } + + public AddVotingTombstonesRequest(StreamInput in) throws IOException { + super(in); + nodeDescriptions = in.readStringArray(); + timeout = in.readTimeValue(); + } + + Set resolveNodes(ClusterState currentState) { + final DiscoveryNodes allNodes = currentState.nodes(); + final Set resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions)) + .map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet()); + + if (resolvedNodes.isEmpty()) { + throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions) + + " matched no master-eligible nodes"); + } + + resolvedNodes.removeIf(n -> currentState.getVotingTombstones().contains(n)); + return resolvedNodes; + } + + Set resolveNodesAndCheckMaximum(ClusterState currentState, int maxTombstoneCount, String maximumSettingKey) { + final Set resolvedNodes = resolveNodes(currentState); + + final int oldTombstoneCount = currentState.getVotingTombstones().size(); + final int newTombstoneCount = resolvedNodes.size(); + if (oldTombstoneCount + newTombstoneCount > maxTombstoneCount) { + throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions) + + " would add [" + newTombstoneCount + "] voting tombstones to the existing [" + oldTombstoneCount + + "] which would exceed the maximum of [" + maxTombstoneCount + "] set by [" + + maximumSettingKey + "]"); + } + return resolvedNodes; + } + + /** + * @return descriptions of the nodes for whom to add tombstones. + */ + public String[] getNodeDescriptions() { + return nodeDescriptions; + } + + /** + * @return how long to wait after adding the tombstones for the nodes to be removed from the voting configuration. + */ + public TimeValue getTimeout() { + return timeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(nodeDescriptions); + out.writeTimeValue(timeout); + } + + @Override + public String toString() { + return "AddVotingTombstonesRequest{" + + "nodeDescriptions=" + Arrays.asList(nodeDescriptions) + + ", timeout=" + timeout + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java new file mode 100644 index 00000000000..2fee3c848c5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponse.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A response to {@link AddVotingTombstonesRequest} indicating that voting tombstones have been added for the requested nodes and these + * nodes have been removed from the voting configuration. + */ +public class AddVotingTombstonesResponse extends ActionResponse { + + public AddVotingTombstonesResponse() { + } + + public AddVotingTombstonesResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesAction.java new file mode 100644 index 00000000000..3c6181ecb12 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable.Reader; + +public class ClearVotingTombstonesAction extends Action { + public static final ClearVotingTombstonesAction INSTANCE = new ClearVotingTombstonesAction(); + public static final String NAME = "cluster:admin/voting/clear_tombstones"; + + private ClearVotingTombstonesAction() { + super(NAME); + } + + @Override + public ClearVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Reader getResponseReader() { + return ClearVotingTombstonesResponse::new; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java new file mode 100644 index 00000000000..84d3917e86e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequest.java @@ -0,0 +1,103 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; + +/** + * A request to clear the voting tombstones from the cluster state, optionally waiting for these nodes to be removed from the cluster first. + */ +public class ClearVotingTombstonesRequest extends MasterNodeRequest { + private boolean waitForRemoval = true; + private TimeValue timeout = TimeValue.timeValueSeconds(30); + + /** + * Construct a request to remove all the voting tombstones from the cluster state. + */ + public ClearVotingTombstonesRequest() { + } + + public ClearVotingTombstonesRequest(StreamInput in) throws IOException { + super(in); + waitForRemoval = in.readBoolean(); + timeout = in.readTimeValue(); + } + + /** + * @return whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. True by default. + */ + public boolean getWaitForRemoval() { + return waitForRemoval; + } + + /** + * @param waitForRemoval whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. True + * by default. + */ + public void setWaitForRemoval(boolean waitForRemoval) { + this.waitForRemoval = waitForRemoval; + } + + /** + * @param timeout how long to wait for the tombstoned nodes to be removed if {@link ClearVotingTombstonesRequest#waitForRemoval} is + * true. Defaults to 30 seconds. + */ + public void setTimeout(TimeValue timeout) { + this.timeout = timeout; + } + + /** + * @return how long to wait for the tombstoned nodes to be removed if {@link ClearVotingTombstonesRequest#waitForRemoval} is + * true. Defaults to 30 seconds. + */ + public TimeValue getTimeout() { + return timeout; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(waitForRemoval); + out.writeTimeValue(timeout); + } + + @Override + public String toString() { + return "ClearVotingTombstonesRequest{" + + ", waitForRemoval=" + waitForRemoval + + ", timeout=" + timeout + + '}'; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java new file mode 100644 index 00000000000..1237e2e265f --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponse.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A response to {@link ClearVotingTombstonesRequest} indicating that voting tombstones have been cleared from the cluster state. + */ +public class ClearVotingTombstonesResponse extends ActionResponse { + public ClearVotingTombstonesResponse() { + } + + public ClearVotingTombstonesResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java new file mode 100644 index 00000000000..34f452988d2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesAction.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateObserver.Listener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public class TransportAddVotingTombstonesAction extends TransportMasterNodeAction { + + public static final Setting MAXIMUM_VOTING_TOMBSTONES_SETTING + = Setting.intSetting("cluster.max_voting_tombstones", 10, 1, Property.Dynamic, Property.NodeScope); + + @Inject + public TransportAddVotingTombstonesAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(AddVotingTombstonesAction.NAME, transportService, clusterService, threadPool, actionFilters, AddVotingTombstonesRequest::new, + indexNameExpressionResolver); + } + + @Override + protected String executor() { + return Names.SAME; + } + + @Override + protected AddVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected AddVotingTombstonesResponse read(StreamInput in) throws IOException { + return new AddVotingTombstonesResponse(in); + } + + @Override + protected void masterOperation(AddVotingTombstonesRequest request, ClusterState state, + ActionListener listener) throws Exception { + + resolveNodesAndCheckMaximum(request, state); // throws IllegalArgumentException if no nodes matched or maximum exceeded + + clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) { + + private Set resolvedNodes; + + @Override + public ClusterState execute(ClusterState currentState) { + assert resolvedNodes == null : resolvedNodes; + resolvedNodes = resolveNodesAndCheckMaximum(request, currentState); + + final Builder builder = ClusterState.builder(currentState); + resolvedNodes.forEach(builder::addVotingTombstone); + final ClusterState newState = builder.build(); + assert newState.getVotingTombstones().size() <= MAXIMUM_VOTING_TOMBSTONES_SETTING.get(currentState.metaData().settings()); + return newState; + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + + final ClusterStateObserver observer + = new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext()); + + final Set resolvedNodeIds = resolvedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + + final Predicate allNodesRemoved = clusterState -> { + final Set votingNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); + return resolvedNodeIds.stream().noneMatch(votingNodeIds::contains); + }; + + final Listener clusterStateListener = new Listener() { + @Override + public void onNewClusterState(ClusterState state) { + listener.onResponse(new AddVotingTombstonesResponse()); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new ElasticsearchException("cluster service closed while waiting for withdrawal of votes from " + + resolvedNodes)); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for withdrawal of votes from " + + resolvedNodes)); + } + }; + + if (allNodesRemoved.test(newState)) { + clusterStateListener.onNewClusterState(newState); + } else { + observer.waitForNextChange(clusterStateListener, allNodesRemoved); + } + } + }); + } + + private static Set resolveNodesAndCheckMaximum(AddVotingTombstonesRequest request, ClusterState state) { + return request.resolveNodesAndCheckMaximum(state, + MAXIMUM_VOTING_TOMBSTONES_SETTING.get(state.metaData().settings()), MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey()); + } + + @Override + protected ClusterBlockException checkBlock(AddVotingTombstonesRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java new file mode 100644 index 00000000000..14592b37ed1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesAction.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.Builder; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateObserver.Listener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.function.Predicate; + +public class TransportClearVotingTombstonesAction + extends TransportMasterNodeAction { + + @Inject + public TransportClearVotingTombstonesAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(ClearVotingTombstonesAction.NAME, transportService, clusterService, threadPool, actionFilters, + ClearVotingTombstonesRequest::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return Names.SAME; + } + + @Override + protected ClearVotingTombstonesResponse newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected ClearVotingTombstonesResponse read(StreamInput in) throws IOException { + return new ClearVotingTombstonesResponse(in); + } + + @Override + protected void masterOperation(ClearVotingTombstonesRequest request, ClusterState initialState, + ActionListener listener) throws Exception { + + final long startTimeMillis = threadPool.relativeTimeInMillis(); + + final Predicate allTombstonedNodesRemoved = newState -> { + for (DiscoveryNode tombstone : initialState.getVotingTombstones()) { + // NB checking for the existence of any node with this persistent ID, because persistent IDs are how votes are counted. + // Calling nodeExists(tombstone) is insufficient because this compares on the ephemeral ID. + if (newState.nodes().nodeExists(tombstone.getId())) { + return false; + } + } + return true; + }; + + if (request.getWaitForRemoval() && allTombstonedNodesRemoved.test(initialState) == false) { + final ClusterStateObserver clusterStateObserver = new ClusterStateObserver(initialState, clusterService, request.getTimeout(), + logger, threadPool.getThreadContext()); + + clusterStateObserver.waitForNextChange(new Listener() { + @Override + public void onNewClusterState(ClusterState state) { + submitClearTombstonesTask(request, startTimeMillis, listener); + } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new ElasticsearchException("cluster service closed while waiting for removal of nodes " + + initialState.getVotingTombstones())); + } + + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchTimeoutException( + "timed out waiting for removal of nodes; if nodes should not be removed, set waitForRemoval to false. " + + initialState.getVotingTombstones())); + } + }, allTombstonedNodesRemoved); + } else { + submitClearTombstonesTask(request, startTimeMillis, listener); + } + } + + private void submitClearTombstonesTask(ClearVotingTombstonesRequest request, long startTimeMillis, + ActionListener listener) { + clusterService.submitStateUpdateTask("clear-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + final Builder builder = ClusterState.builder(currentState); + builder.clearVotingTombstones(); + return builder.build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public TimeValue timeout() { + return TimeValue.timeValueMillis(request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis()); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new ClearVotingTombstonesResponse()); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(ClearVotingTombstonesRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 99b80d7a1e6..90f8c9317fa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -62,6 +62,7 @@ import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.PublishClusterStateAction; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -72,6 +73,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * Represents the current state of the cluster. @@ -186,18 +188,21 @@ public class ClusterState implements ToXContentFragment, Diffable private final VotingConfiguration lastAcceptedConfiguration; + private final Set votingTombstones; + // built on demand private volatile RoutingNodes routingNodes; public ClusterState(long term, long version, String stateUUID, ClusterState state) { this(state.clusterName, term, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), - state.customs(), state.getLastCommittedConfiguration(), state.getLastAcceptedConfiguration(), false); + state.customs(), state.getLastCommittedConfiguration(), state.getLastAcceptedConfiguration(), state.getVotingTombstones(), + false); } public ClusterState(ClusterName clusterName, long term, long version, String stateUUID, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs, VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration, - boolean wasReadFromDiff) { + Set votingTombstones, boolean wasReadFromDiff) { this.term = term; this.version = version; this.stateUUID = stateUUID; @@ -209,6 +214,7 @@ public class ClusterState implements ToXContentFragment, Diffable this.customs = customs; this.lastCommittedConfiguration = lastCommittedConfiguration; this.lastAcceptedConfiguration = lastAcceptedConfiguration; + this.votingTombstones = votingTombstones; this.wasReadFromDiff = wasReadFromDiff; } @@ -288,6 +294,10 @@ public class ClusterState implements ToXContentFragment, Diffable return lastCommittedConfiguration; } + public Set getVotingTombstones() { + return votingTombstones; + } + // Used for testing and logging to determine how this cluster state was send over the wire public boolean wasReadFromDiff() { return wasReadFromDiff; @@ -313,6 +323,7 @@ public class ClusterState implements ToXContentFragment, Diffable sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("last committed config: ").append(getLastCommittedConfiguration()).append("\n"); sb.append("last accepted config: ").append(getLastAcceptedConfiguration()).append("\n"); + sb.append("voting tombstones: ").append(votingTombstones).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); sb.append("meta data version: ").append(metaData.version()).append("\n"); final String TAB = " "; @@ -428,6 +439,7 @@ public class ClusterState implements ToXContentFragment, Diffable builder.field("state_uuid", stateUUID); builder.field("last_committed_config", lastCommittedConfiguration); builder.field("last_accepted_config", lastAcceptedConfiguration); + // TODO include voting tombstones here } if (metrics.contains(Metric.MASTER_NODE)) { @@ -637,6 +649,7 @@ public class ClusterState implements ToXContentFragment, Diffable private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES; private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK; private final ImmutableOpenMap.Builder customs; + private final Set votingTombstones = new HashSet<>(); private boolean fromDiff; @@ -647,6 +660,7 @@ public class ClusterState implements ToXContentFragment, Diffable this.uuid = state.stateUUID(); this.lastCommittedConfiguration = state.getLastCommittedConfiguration(); this.lastAcceptedConfiguration = state.getLastAcceptedConfiguration(); + this.votingTombstones.addAll(state.getVotingTombstones()); this.nodes = state.nodes(); this.routingTable = state.routingTable(); this.metaData = state.metaData(); @@ -752,7 +766,8 @@ public class ClusterState implements ToXContentFragment, Diffable uuid = UUIDs.randomBase64UUID(); } return new ClusterState(clusterName, term, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), - lastCommittedConfiguration, lastAcceptedConfiguration, fromDiff); + lastCommittedConfiguration, lastAcceptedConfiguration, Collections.unmodifiableSet(new HashSet<>(votingTombstones)), + fromDiff); } public static byte[] toBytes(ClusterState state) throws IOException { @@ -770,6 +785,14 @@ public class ClusterState implements ToXContentFragment, Diffable return readFrom(in, localNode); } + + public void addVotingTombstone(DiscoveryNode tombstone) { + votingTombstones.add(tombstone); + } + + public void clearVotingTombstones() { + votingTombstones.clear(); + } } @Override @@ -792,6 +815,7 @@ public class ClusterState implements ToXContentFragment, Diffable if (in.getVersion().onOrAfter(Version.V_7_0_0)) { builder.lastCommittedConfiguration(new VotingConfiguration(in)); builder.lastAcceptedConfiguration(new VotingConfiguration(in)); + in.readSet(DiscoveryNode::new).forEach(builder::addVotingTombstone); } builder.metaData = MetaData.readFrom(in); builder.routingTable = RoutingTable.readFrom(in); @@ -816,6 +840,7 @@ public class ClusterState implements ToXContentFragment, Diffable if (out.getVersion().onOrAfter(Version.V_7_0_0)) { lastCommittedConfiguration.writeTo(out); lastAcceptedConfiguration.writeTo(out); + out.writeCollection(votingTombstones, (o, v) -> v.writeTo(o)); } metaData.writeTo(out); routingTable.writeTo(out); @@ -852,6 +877,8 @@ public class ClusterState implements ToXContentFragment, Diffable private final VotingConfiguration lastAcceptedConfiguration; + private final Set votingTombstones; + private final Diff routingTable; private final Diff nodes; @@ -870,6 +897,7 @@ public class ClusterState implements ToXContentFragment, Diffable clusterName = after.clusterName; lastCommittedConfiguration = after.lastCommittedConfiguration; lastAcceptedConfiguration = after.lastAcceptedConfiguration; + votingTombstones = after.votingTombstones; routingTable = after.routingTable.diff(before.routingTable); nodes = after.nodes.diff(before.nodes); metaData = after.metaData.diff(before.metaData); @@ -890,9 +918,11 @@ public class ClusterState implements ToXContentFragment, Diffable if (in.getVersion().onOrAfter(Version.V_7_0_0)) { lastCommittedConfiguration = new VotingConfiguration(in); lastAcceptedConfiguration = new VotingConfiguration(in); + votingTombstones = in.readSet(DiscoveryNode::new); } else { lastCommittedConfiguration = VotingConfiguration.EMPTY_CONFIG; lastAcceptedConfiguration = VotingConfiguration.EMPTY_CONFIG; + votingTombstones = Collections.emptySet(); } routingTable = RoutingTable.readDiffFrom(in); nodes = DiscoveryNodes.readDiffFrom(in, localNode); @@ -913,6 +943,7 @@ public class ClusterState implements ToXContentFragment, Diffable if (out.getVersion().onOrAfter(Version.V_7_0_0)) { lastCommittedConfiguration.writeTo(out); lastAcceptedConfiguration.writeTo(out); + out.writeCollection(votingTombstones, (o, v) -> v.writeTo(o)); } routingTable.writeTo(out); nodes.writeTo(out); @@ -936,6 +967,7 @@ public class ClusterState implements ToXContentFragment, Diffable builder.version(toVersion); builder.lastCommittedConfiguration(lastCommittedConfiguration); builder.lastAcceptedConfiguration(lastAcceptedConfiguration); + votingTombstones.forEach(builder::addVotingTombstone); builder.routingTable(routingTable.apply(state.routingTable)); builder.nodes(nodes.apply(state.nodes)); builder.metaData(metaData.apply(state.metaData)); @@ -1009,5 +1041,10 @@ public class ClusterState implements ToXContentFragment, Diffable } return builder.endArray(); } + + public static VotingConfiguration of(DiscoveryNode... nodes) { + // this could be used in many more places - TODO use this where appropriate + return new VotingConfiguration(Arrays.stream(nodes).map(DiscoveryNode::getId).collect(Collectors.toSet())); + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 6b15d8a51ca..ae69ff27718 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -76,7 +76,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static java.util.Collections.emptySet; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -642,8 +641,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery final Set liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false) .filter(this::hasJoinVoteFrom).collect(Collectors.toSet()); - final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure( - liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration()); + final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes, + clusterState.getVotingTombstones().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()), + clusterState.getLastAcceptedConfiguration()); if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) { assert coordinationState.get().joinVotesHaveQuorumFor(newConfig); return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build(); diff --git a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 9667341cd6e..db796d5ee3e 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -79,7 +79,8 @@ public abstract class AbstractScopedSettings extends AbstractComponent { Map> keySettings = new HashMap<>(); for (Setting setting : settingsSet) { if (setting.getProperties().contains(scope) == false) { - throw new IllegalArgumentException("Setting must be a " + scope + " setting but has: " + setting.getProperties()); + throw new IllegalArgumentException("Setting " + setting + " must be a " + + scope + " setting but has: " + setting.getProperties()); } validateSettingKey(setting); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 71929276bb1..7f159ddb42a 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.settings; import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; @@ -457,7 +458,8 @@ public final class ClusterSettings extends AbstractScopedSettings { ElectionSchedulerFactory.ELECTION_DURATION_SETTING, Coordinator.PUBLISH_TIMEOUT_SETTING, JoinHelper.JOIN_TIMEOUT_SETTING, - Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION + Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION, + TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING ))); public static List> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList( diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java index 8aba67e89bd..c94bb145a1f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportBootstrapClusterActionTests.java @@ -62,7 +62,8 @@ import static org.mockito.Mockito.verifyZeroInteractions; public class TransportBootstrapClusterActionTests extends ESTestCase { - private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private DiscoveryNode discoveryNode; private static ThreadPool threadPool; private TransportService transportService; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java index 47f1355dd35..1a8ef39fdf8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/bootstrap/TransportGetDiscoveredNodesActionTests.java @@ -46,8 +46,9 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService.HandshakeResponse; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import java.io.IOException; import java.util.Random; @@ -69,14 +70,25 @@ import static org.mockito.Mockito.verifyZeroInteractions; public class TransportGetDiscoveredNodesActionTests extends ESTestCase { - private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet()); + + private static ThreadPool threadPool; private DiscoveryNode localNode; - private ThreadPool threadPool; private String clusterName; private TransportService transportService; private Coordinator coordinator; private DiscoveryNode otherNode; + @BeforeClass + public static void createThreadPool() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + } + + @AfterClass + public static void shutdownThreadPool() { + threadPool.shutdown(); + } + @Before public void setupTest() { clusterName = randomAlphaOfLength(10); @@ -91,7 +103,6 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { } } }; - threadPool = new TestThreadPool("test", Settings.EMPTY); transportService = transport.createTransportService( Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); @@ -104,17 +115,11 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase { new NoOpClusterApplier(), new Random(random().nextLong())); } - @After - public void cleanUp() { - threadPool.shutdown(); - } - public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException { final Discovery discovery = mock(Discovery.class); verifyZeroInteractions(discovery); new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action - transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java new file mode 100644 index 00000000000..cac669aa7c8 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesRequestTests.java @@ -0,0 +1,106 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +public class AddVotingTombstonesRequestTests extends ESTestCase { + public void testSerialization() throws IOException { + int descriptionCount = between(0, 5); + String[] descriptions = new String[descriptionCount]; + for (int i = 0; i < descriptionCount; i++) { + descriptions[i] = randomAlphaOfLength(10); + } + TimeValue timeout = TimeValue.timeValueMillis(between(0, 30000)); + final AddVotingTombstonesRequest originalRequest = new AddVotingTombstonesRequest(descriptions, timeout); + final AddVotingTombstonesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesRequest::new); + assertThat(deserialized.getNodeDescriptions(), equalTo(originalRequest.getNodeDescriptions())); + assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout())); + } + + public void testResolve() { + final DiscoveryNode localNode + = new DiscoveryNode("local", "local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode1 + = new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode2 + = new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherDataNode + = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + + final ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")).nodes(new Builder() + .add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode).localNodeId(localNode.getId())).build(); + + assertThat(makeRequest().resolveNodes(clusterState), containsInAnyOrder(localNode, otherNode1, otherNode2)); + assertThat(makeRequest("_all").resolveNodes(clusterState), containsInAnyOrder(localNode, otherNode1, otherNode2)); + assertThat(makeRequest("_local").resolveNodes(clusterState), contains(localNode)); + assertThat(makeRequest("other*").resolveNodes(clusterState), containsInAnyOrder(otherNode1, otherNode2)); + + assertThat(expectThrows(IllegalArgumentException.class, () -> makeRequest("not-a-node").resolveNodes(clusterState)).getMessage(), + equalTo("add voting tombstones request for [not-a-node] matched no master-eligible nodes")); + } + + public void testResolveAndCheckMaximum() { + final DiscoveryNode localNode + = new DiscoveryNode("local", "local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode1 + = new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + final DiscoveryNode otherNode2 + = new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + + final ClusterState.Builder builder = ClusterState.builder(new ClusterName("cluster")).nodes(new Builder() + .add(localNode).add(otherNode1).add(otherNode2).localNodeId(localNode.getId())); + builder.addVotingTombstone(otherNode1); + final ClusterState clusterState = builder.build(); + + assertThat(makeRequest().resolveNodesAndCheckMaximum(clusterState, 3, "setting.name"), + containsInAnyOrder(localNode, otherNode2)); + assertThat(makeRequest("_local").resolveNodesAndCheckMaximum(clusterState, 2, "setting.name"), + contains(localNode)); + + assertThat(expectThrows(IllegalArgumentException.class, + () -> makeRequest().resolveNodesAndCheckMaximum(clusterState, 2, "setting.name")).getMessage(), + equalTo("add voting tombstones request for [] would add [2] voting tombstones to the existing [1] which would exceed the " + + "maximum of [2] set by [setting.name]")); + assertThat(expectThrows(IllegalArgumentException.class, + () -> makeRequest("_local").resolveNodesAndCheckMaximum(clusterState, 1, "setting.name")).getMessage(), + equalTo("add voting tombstones request for [_local] would add [1] voting tombstones to the existing [1] which would exceed " + + "the maximum of [1] set by [setting.name]")); + } + + private static AddVotingTombstonesRequest makeRequest(String... descriptions) { + return new AddVotingTombstonesRequest(descriptions); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java new file mode 100644 index 00000000000..4b7e5a95ecb --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/AddVotingTombstonesResponseTests.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class AddVotingTombstonesResponseTests extends ESTestCase { + public void testSerialization() throws IOException { + final AddVotingTombstonesResponse originalRequest = new AddVotingTombstonesResponse(); + copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesResponse::new); + // there are no fields so we're just checking that this doesn't throw anything + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequestTests.java new file mode 100644 index 00000000000..ad8fa3a3f52 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesRequestTests.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class ClearVotingTombstonesRequestTests extends ESTestCase { + public void testSerialization() throws IOException { + final ClearVotingTombstonesRequest originalRequest = new ClearVotingTombstonesRequest(); + if (randomBoolean()) { + originalRequest.setWaitForRemoval(randomBoolean()); + } + if (randomBoolean()) { + originalRequest.setTimeout(TimeValue.timeValueMillis(randomLongBetween(0, 30000))); + } + final ClearVotingTombstonesRequest deserialized + = copyWriteable(originalRequest, writableRegistry(), ClearVotingTombstonesRequest::new); + assertThat(deserialized.getWaitForRemoval(), equalTo(originalRequest.getWaitForRemoval())); + assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout())); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponseTests.java new file mode 100644 index 00000000000..3ae9eba2c14 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/ClearVotingTombstonesResponseTests.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class ClearVotingTombstonesResponseTests extends ESTestCase { + public void testSerialization() throws IOException { + final ClearVotingTombstonesResponse originalRequest = new ClearVotingTombstonesResponse(); + copyWriteable(originalRequest, writableRegistry(), ClearVotingTombstonesResponse::new); + // there are no fields so we're just checking that this doesn't throw anything + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java new file mode 100644 index 00000000000..87db69ef787 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportAddVotingTombstonesActionTests.java @@ -0,0 +1,405 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.VotingConfiguration; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.ClusterStateObserver.Listener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING; +import static org.elasticsearch.cluster.ClusterState.builder; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.startsWith; + +public class TransportAddVotingTombstonesActionTests extends ESTestCase { + + private static ThreadPool threadPool; + private static ClusterService clusterService; + private static DiscoveryNode localNode, otherNode1, otherNode2, otherDataNode; + + private TransportService transportService; + private ClusterStateObserver clusterStateObserver; + + @BeforeClass + public static void createThreadPoolAndClusterService() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + localNode = makeDiscoveryNode("local"); + otherNode1 = makeDiscoveryNode("other1"); + otherNode2 = makeDiscoveryNode("other2"); + otherDataNode = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + clusterService = createClusterService(threadPool, localNode); + } + + private static DiscoveryNode makeDiscoveryNode(String name) { + return new DiscoveryNode(name, name, buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT); + } + + @AfterClass + public static void shutdownThreadPoolAndClusterService() { + clusterService.stop(); + threadPool.shutdown(); + } + + @Before + public void setupForTest() { + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + + new TransportAddVotingTombstonesAction(transportService, clusterService, threadPool, new ActionFilters(emptySet()), + new IndexNameExpressionResolver()); // registers action + + transportService.start(); + transportService.acceptIncomingRequests(); + + final VotingConfiguration allNodesConfig = VotingConfiguration.of(localNode, otherNode1, otherNode2); + + setState(clusterService, builder(new ClusterName("cluster")) + .nodes(new Builder().add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode) + .localNodeId(localNode.getId()).masterNodeId(localNode.getId())) + .lastAcceptedConfiguration(allNodesConfig).lastCommittedConfiguration(allNodesConfig)); + + clusterStateObserver = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); + } + + public void testWithdrawsVoteFromANode() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}), + expectSuccess(r -> { + assertNotNull(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); + } + + public void testWithdrawsVotesFromMultipleNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1", "other2"}), + expectSuccess(r -> { + assertNotNull(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + } + + public void testWithdrawsVotesFromNodesMatchingWildcard() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other*"}), + expectSuccess(r -> { + assertNotNull(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + } + + public void testWithdrawsVotesFromAllMasterEligibleNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"_all"}), + expectSuccess(r -> { + assertNotNull(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), + containsInAnyOrder(localNode, otherNode1, otherNode2)); + } + + public void testWithdrawsVoteFromLocalNode() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + + clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones()); + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"_local"}), + expectSuccess(r -> { + assertNotNull(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(localNode)); + } + + public void testReturnsImmediatelyIfVoteAlreadyWithdrawn() throws InterruptedException { + setState(clusterService, builder(clusterService.state()) + .lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2)) + .lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2))); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + + // no observer to reconfigure + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}, TimeValue.ZERO), + expectSuccess(r -> { + assertNotNull(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); + } + + public void testReturnsErrorIfNoMatchingNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"not-a-node"}), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), equalTo("add voting tombstones request for [not-a-node] matched no master-eligible nodes")); + } + + public void testOnlyMatchesMasterEligibleNodes() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"_all", "master:false"}), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), + equalTo("add voting tombstones request for [_all, master:false] matched no master-eligible nodes")); + } + + public void testSucceedsEvenIfAllTombstonesAlreadyAdded() throws InterruptedException { + final ClusterState.Builder builder = builder(clusterService.state()); + builder.addVotingTombstone(otherNode1); + setState(clusterService, builder); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}), + expectSuccess(r -> { + assertNotNull(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1)); + } + + public void testReturnsErrorIfMaximumTombstoneCountExceeded() throws InterruptedException { + final ClusterState.Builder builder = builder(clusterService.state()) + .metaData(MetaData.builder(clusterService.state().metaData()).persistentSettings( + Settings.builder().put(clusterService.state().metaData().persistentSettings()) + .put(MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey(), 2).build())); + builder.addVotingTombstone(localNode); + final int existingCount, newCount; + if (randomBoolean()) { + builder.addVotingTombstone(otherNode1); + existingCount = 2; + newCount = 1; + } else { + existingCount = 1; + newCount = 2; + } + setState(clusterService, builder); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other*"}), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(IllegalArgumentException.class)); + assertThat(rootCause.getMessage(), equalTo("add voting tombstones request for [other*] would add [" + newCount + + "] voting tombstones to the existing [" + existingCount + + "] which would exceed the maximum of [2] set by [cluster.max_voting_tombstones]")); + } + + public void testTimesOut() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce exceptionHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME, + new AddVotingTombstonesRequest(new String[]{"other1"}, TimeValue.timeValueMillis(100)), + expectError(e -> { + exceptionHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + final Throwable rootCause = exceptionHolder.get().getRootCause(); + assertThat(rootCause,instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), startsWith("timed out waiting for withdrawal of votes from [{other1}")); + } + + private TransportResponseHandler expectSuccess(Consumer onResponse) { + return responseHandler(onResponse, e -> { + throw new AssertionError("unexpected", e); + }); + } + + private TransportResponseHandler expectError(Consumer onException) { + return responseHandler(r -> { + assert false : r; + }, onException); + } + + private TransportResponseHandler responseHandler(Consumer onResponse, + Consumer onException) { + return new TransportResponseHandler() { + @Override + public void handleResponse(AddVotingTombstonesResponse response) { + onResponse.accept(response); + } + + @Override + public void handleException(TransportException exp) { + onException.accept(exp); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public AddVotingTombstonesResponse read(StreamInput in) throws IOException { + return new AddVotingTombstonesResponse(in); + } + }; + } + + private class AdjustConfigurationForTombstones implements Listener { + @Override + public void onNewClusterState(ClusterState state) { + clusterService.getMasterService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + assertThat(currentState, sameInstance(state)); + final Set votingNodeIds = new HashSet<>(); + currentState.nodes().forEach(n -> votingNodeIds.add(n.getId())); + currentState.getVotingTombstones().forEach(t -> votingNodeIds.remove(t.getId())); + final VotingConfiguration votingConfiguration = new VotingConfiguration(votingNodeIds); + return builder(currentState) + .lastAcceptedConfiguration(votingConfiguration) + .lastCommittedConfiguration(votingConfiguration).build(); + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError("unexpected failure", e); + } + }); + } + + @Override + public void onClusterServiceClose() { + throw new AssertionError("unexpected close"); + } + + @Override + public void onTimeout(TimeValue timeout) { + throw new AssertionError("unexpected timeout"); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java new file mode 100644 index 00000000000..ff978adcee8 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingTombstonesActionTests.java @@ -0,0 +1,202 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.configuration; + +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.cluster.ClusterState.builder; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; + +public class TransportClearVotingTombstonesActionTests extends ESTestCase { + + private static ThreadPool threadPool; + private static ClusterService clusterService; + private static DiscoveryNode localNode, otherNode1, otherNode2; + + private TransportService transportService; + + @BeforeClass + public static void createThreadPoolAndClusterService() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + otherNode1 = new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + otherNode2 = new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + clusterService = createClusterService(threadPool, localNode); + } + + @AfterClass + public static void shutdownThreadPoolAndClusterService() { + clusterService.stop(); + threadPool.shutdown(); + } + + @Before + public void setupForTest() { + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet()); + + new TransportClearVotingTombstonesAction(transportService, clusterService, threadPool, new ActionFilters(emptySet()), + new IndexNameExpressionResolver()); // registers action + + transportService.start(); + transportService.acceptIncomingRequests(); + + final ClusterState.Builder builder = builder(new ClusterName("cluster")) + .nodes(new Builder().add(localNode).add(otherNode1).add(otherNode2) + .localNodeId(localNode.getId()).masterNodeId(localNode.getId())); + builder.addVotingTombstone(otherNode1); + builder.addVotingTombstone(otherNode2); + setState(clusterService, builder); + } + + public void testClearsVotingTombstones() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest(); + clearVotingTombstonesRequest.setWaitForRemoval(false); + transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, + clearVotingTombstonesRequest, + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertNotNull(responseHolder.get()); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), empty()); + } + + public void testTimesOutIfWaitingForNodesThatAreNotRemoved() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest(); + clearVotingTombstonesRequest.setTimeout(TimeValue.timeValueMillis(100)); + transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, + clearVotingTombstonesRequest, + expectError(e -> { + responseHolder.set(e); + countDownLatch.countDown(); + }) + ); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2)); + final Throwable rootCause = responseHolder.get().getRootCause(); + assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class)); + assertThat(rootCause.getMessage(), + startsWith("timed out waiting for removal of nodes; if nodes should not be removed, set waitForRemoval to false. [")); + } + + public void testSucceedsIfNodesAreRemovedWhileWaiting() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final SetOnce responseHolder = new SetOnce<>(); + + transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME, + new ClearVotingTombstonesRequest(), + expectSuccess(r -> { + responseHolder.set(r); + countDownLatch.countDown(); + }) + ); + + final ClusterState.Builder builder = builder(clusterService.state()); + builder.nodes(DiscoveryNodes.builder(clusterService.state().nodes()).remove(otherNode1).remove(otherNode2)); + setState(clusterService, builder); + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), empty()); + } + + private TransportResponseHandler expectSuccess(Consumer onResponse) { + return responseHandler(onResponse, e -> { + throw new AssertionError("unexpected", e); + }); + } + + private TransportResponseHandler expectError(Consumer onException) { + return responseHandler(r -> { + assert false : r; + }, onException); + } + + private TransportResponseHandler responseHandler(Consumer onResponse, + Consumer onException) { + return new TransportResponseHandler() { + @Override + public void handleResponse(ClearVotingTombstonesResponse response) { + onResponse.accept(response); + } + + @Override + public void handleException(TransportException exp) { + onException.accept(exp); + } + + @Override + public String executor() { + return Names.SAME; + } + + @Override + public ClearVotingTombstonesResponse read(StreamInput in) throws IOException { + return new ClearVotingTombstonesResponse(in); + } + }; + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index f9da7a1aa8e..acbc9b45b4f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -45,6 +45,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; @@ -66,6 +67,13 @@ import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) public class AllocationIdIT extends ESIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(TestZenDiscovery.USE_ZEN2.getKey(), true) + .build(); + } + @Override protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class); @@ -224,15 +232,17 @@ public class AllocationIdIT extends ESIntegTestCase { } private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception { - final ClusterAllocationExplanation explanation = - client().admin().cluster().prepareAllocationExplain() - .setIndex(indexName).setShard(shardId.id()).setPrimary(true) - .get().getExplanation(); + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(shardId.id()).setPrimary(true) + .get().getExplanation(); - final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); - assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); - assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), - equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + }); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1f71a2c2d9b..42e2d6008c6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -27,11 +27,14 @@ import com.carrotsearch.randomizedtesting.SysGlobals; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesRequest; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesAction; +import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; @@ -1622,18 +1625,51 @@ public final class InternalTestCluster extends TestCluster { } private synchronized void stopNodesAndClients(Collection nodeAndClients) throws IOException { + final Set withdrawnNodeIds = new HashSet<>(); + if (autoManageMinMasterNodes && nodeAndClients.size() > 0) { - int masters = (int)nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count(); - if (masters > 0) { - updateMinMasterNodes(getMasterNodesCount() - masters); + + final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count(); + final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count(); + + assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters; + if (stoppingMasters != currentMasters && stoppingMasters > 0) { + // If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first. + // However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have + // been updated when the previous nodes shut down, so we must always explicitly withdraw votes. + // TODO add cluster health API to check that voting configuration is optimal so this isn't always needed + nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(withdrawnNodeIds::add); + assert withdrawnNodeIds.size() == stoppingMasters; + + logger.info("withdrawing votes from {} prior to shutdown", withdrawnNodeIds); + try { + client().execute(AddVotingTombstonesAction.INSTANCE, + new AddVotingTombstonesRequest(withdrawnNodeIds.toArray(new String[0]))).get(); + } catch (InterruptedException | ExecutionException e) { + throw new AssertionError("unexpected", e); + } + } + + if (stoppingMasters > 0) { + updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters)); } } + for (NodeAndClient nodeAndClient: nodeAndClients) { removeDisruptionSchemeFromNode(nodeAndClient); NodeAndClient previous = nodes.remove(nodeAndClient.name); assert previous == nodeAndClient; nodeAndClient.close(); } + + if (withdrawnNodeIds.isEmpty() == false) { + logger.info("removing voting tombstones for {} after shutdown", withdrawnNodeIds); + try { + client().execute(ClearVotingTombstonesAction.INSTANCE, new ClearVotingTombstonesRequest()).get(); + } catch (InterruptedException | ExecutionException e) { + throw new AssertionError("unexpected", e); + } + } } /** diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java new file mode 100644 index 00000000000..f9c90a05202 --- /dev/null +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.test; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.discovery.TestZenDiscovery; + +import java.io.IOException; + +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class InternalTestClusterIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put(TestZenDiscovery.USE_ZEN2.getKey(), true) + .build(); + } + + public void testStartingAndStoppingNodes() throws IOException { + logger.info("--> cluster has [{}] nodes", internalCluster().size()); + if (internalCluster().size() < 5) { + final int nodesToStart = randomIntBetween(Math.max(2, internalCluster().size() + 1), 5); + logger.info("--> growing to [{}] nodes", nodesToStart); + internalCluster().startNodes(nodesToStart); + } + ensureGreen(); + + while (internalCluster().size() > 1) { + final int nodesToRemain = randomIntBetween(1, internalCluster().size() - 1); + logger.info("--> reducing to [{}] nodes", nodesToRemain); + internalCluster().ensureAtMostNumDataNodes(nodesToRemain); + assertThat(internalCluster().size(), lessThanOrEqualTo(nodesToRemain)); + } + + ensureGreen(); + } + + public void testStoppingNodesOneByOne() throws IOException { + // In a 5+ node cluster there must be at least one reconfiguration as the nodes are shut down one-by-one before we drop to 2 nodes. + // If the nodes shut down too quickly then this reconfiguration does not have time to occur and the quorum is lost in the 3->2 + // transition, even though in a stable cluster the 3->2 transition requires no special treatment. + + internalCluster().startNodes(5); + ensureGreen(); + + while (internalCluster().size() > 1) { + internalCluster().stopRandomNode(s -> true); + } + + ensureGreen(); + } +}