[Zen2] Introduce vote withdrawal (#35446)
If shutting down half or more of the master-eligible nodes, their votes must first be explicitly withdrawn to ensure that the cluster doesn't lose its quorum. This works via _voting tombstones_, stored in the cluster state, which tell the reconfigurator to remove nodes from the voting configuration. This change introduces voting tombstones to the cluster state, together with transport APIs for adding and removing them, and makes use of these APIs in `InternalTestCluster` to support tests which remove at least half of the master-eligible nodes at once (e.g. shrinking from two master-eligible nodes to one).
This commit is contained in:
parent
0e1a12122c
commit
8e40a2bbe2
|
@ -27,6 +27,10 @@ import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
|
|||
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
|
||||
import org.elasticsearch.action.admin.cluster.bootstrap.TransportBootstrapClusterAction;
|
||||
import org.elasticsearch.action.admin.cluster.bootstrap.TransportGetDiscoveredNodesAction;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesAction;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesAction;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingTombstonesAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
|
||||
|
@ -428,6 +432,8 @@ public class ActionModule extends AbstractModule {
|
|||
|
||||
actions.register(GetDiscoveredNodesAction.INSTANCE, TransportGetDiscoveredNodesAction.class);
|
||||
actions.register(BootstrapClusterAction.INSTANCE, TransportBootstrapClusterAction.class);
|
||||
actions.register(AddVotingTombstonesAction.INSTANCE, TransportAddVotingTombstonesAction.class);
|
||||
actions.register(ClearVotingTombstonesAction.INSTANCE, TransportClearVotingTombstonesAction.class);
|
||||
actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);
|
||||
actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
|
||||
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
|
||||
public class AddVotingTombstonesAction extends Action<AddVotingTombstonesResponse> {
|
||||
public static final AddVotingTombstonesAction INSTANCE = new AddVotingTombstonesAction();
|
||||
public static final String NAME = "cluster:admin/voting/add_tombstones";
|
||||
|
||||
private AddVotingTombstonesAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddVotingTombstonesResponse newResponse() {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Reader<AddVotingTombstonesResponse> getResponseReader() {
|
||||
return AddVotingTombstonesResponse::new;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A request to add voting tombstones for certain master-eligible nodes, and wait for these nodes to be removed from the voting
|
||||
* configuration.
|
||||
*/
|
||||
public class AddVotingTombstonesRequest extends MasterNodeRequest<AddVotingTombstonesRequest> {
|
||||
private final String[] nodeDescriptions;
|
||||
private final TimeValue timeout;
|
||||
|
||||
/**
|
||||
* Construct a request to add voting tombstones for master-eligible nodes matching the given descriptions, and wait for a default 30
|
||||
* seconds for these nodes to be removed from the voting configuration.
|
||||
* @param nodeDescriptions Descriptions of the nodes to add - see {@link DiscoveryNodes#resolveNodes(String...)}
|
||||
*/
|
||||
public AddVotingTombstonesRequest(String[] nodeDescriptions) {
|
||||
this(nodeDescriptions, TimeValue.timeValueSeconds(30));
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a request to add voting tombstones for master-eligible nodes matching the given descriptions, and wait for these nodes to
|
||||
* be removed from the voting configuration.
|
||||
* @param nodeDescriptions Descriptions of the nodes whose tombstones to add - see {@link DiscoveryNodes#resolveNodes(String...)}.
|
||||
* @param timeout How long to wait for the nodes to be removed from the voting configuration.
|
||||
*/
|
||||
public AddVotingTombstonesRequest(String[] nodeDescriptions, TimeValue timeout) {
|
||||
if (timeout.compareTo(TimeValue.ZERO) < 0) {
|
||||
throw new IllegalArgumentException("timeout [" + timeout + "] must be non-negative");
|
||||
}
|
||||
this.nodeDescriptions = nodeDescriptions;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public AddVotingTombstonesRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
nodeDescriptions = in.readStringArray();
|
||||
timeout = in.readTimeValue();
|
||||
}
|
||||
|
||||
Set<DiscoveryNode> resolveNodes(ClusterState currentState) {
|
||||
final DiscoveryNodes allNodes = currentState.nodes();
|
||||
final Set<DiscoveryNode> resolvedNodes = Arrays.stream(allNodes.resolveNodes(nodeDescriptions))
|
||||
.map(allNodes::get).filter(DiscoveryNode::isMasterNode).collect(Collectors.toSet());
|
||||
|
||||
if (resolvedNodes.isEmpty()) {
|
||||
throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions)
|
||||
+ " matched no master-eligible nodes");
|
||||
}
|
||||
|
||||
resolvedNodes.removeIf(n -> currentState.getVotingTombstones().contains(n));
|
||||
return resolvedNodes;
|
||||
}
|
||||
|
||||
Set<DiscoveryNode> resolveNodesAndCheckMaximum(ClusterState currentState, int maxTombstoneCount, String maximumSettingKey) {
|
||||
final Set<DiscoveryNode> resolvedNodes = resolveNodes(currentState);
|
||||
|
||||
final int oldTombstoneCount = currentState.getVotingTombstones().size();
|
||||
final int newTombstoneCount = resolvedNodes.size();
|
||||
if (oldTombstoneCount + newTombstoneCount > maxTombstoneCount) {
|
||||
throw new IllegalArgumentException("add voting tombstones request for " + Arrays.asList(nodeDescriptions)
|
||||
+ " would add [" + newTombstoneCount + "] voting tombstones to the existing [" + oldTombstoneCount
|
||||
+ "] which would exceed the maximum of [" + maxTombstoneCount + "] set by ["
|
||||
+ maximumSettingKey + "]");
|
||||
}
|
||||
return resolvedNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return descriptions of the nodes for whom to add tombstones.
|
||||
*/
|
||||
public String[] getNodeDescriptions() {
|
||||
return nodeDescriptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return how long to wait after adding the tombstones for the nodes to be removed from the voting configuration.
|
||||
*/
|
||||
public TimeValue getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringArray(nodeDescriptions);
|
||||
out.writeTimeValue(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AddVotingTombstonesRequest{" +
|
||||
"nodeDescriptions=" + Arrays.asList(nodeDescriptions) +
|
||||
", timeout=" + timeout +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A response to {@link AddVotingTombstonesRequest} indicating that voting tombstones have been added for the requested nodes and these
|
||||
* nodes have been removed from the voting configuration.
|
||||
*/
|
||||
public class AddVotingTombstonesResponse extends ActionResponse {
|
||||
|
||||
public AddVotingTombstonesResponse() {
|
||||
}
|
||||
|
||||
public AddVotingTombstonesResponse(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
|
||||
public class ClearVotingTombstonesAction extends Action<ClearVotingTombstonesResponse> {
|
||||
public static final ClearVotingTombstonesAction INSTANCE = new ClearVotingTombstonesAction();
|
||||
public static final String NAME = "cluster:admin/voting/clear_tombstones";
|
||||
|
||||
private ClearVotingTombstonesAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClearVotingTombstonesResponse newResponse() {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Reader<ClearVotingTombstonesResponse> getResponseReader() {
|
||||
return ClearVotingTombstonesResponse::new;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A request to clear the voting tombstones from the cluster state, optionally waiting for these nodes to be removed from the cluster first.
|
||||
*/
|
||||
public class ClearVotingTombstonesRequest extends MasterNodeRequest<ClearVotingTombstonesRequest> {
|
||||
private boolean waitForRemoval = true;
|
||||
private TimeValue timeout = TimeValue.timeValueSeconds(30);
|
||||
|
||||
/**
|
||||
* Construct a request to remove all the voting tombstones from the cluster state.
|
||||
*/
|
||||
public ClearVotingTombstonesRequest() {
|
||||
}
|
||||
|
||||
public ClearVotingTombstonesRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
waitForRemoval = in.readBoolean();
|
||||
timeout = in.readTimeValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. True by default.
|
||||
*/
|
||||
public boolean getWaitForRemoval() {
|
||||
return waitForRemoval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param waitForRemoval whether to wait for the tombstoned nodes to be removed from the cluster before removing their tombstones. True
|
||||
* by default.
|
||||
*/
|
||||
public void setWaitForRemoval(boolean waitForRemoval) {
|
||||
this.waitForRemoval = waitForRemoval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param timeout how long to wait for the tombstoned nodes to be removed if {@link ClearVotingTombstonesRequest#waitForRemoval} is
|
||||
* true. Defaults to 30 seconds.
|
||||
*/
|
||||
public void setTimeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return how long to wait for the tombstoned nodes to be removed if {@link ClearVotingTombstonesRequest#waitForRemoval} is
|
||||
* true. Defaults to 30 seconds.
|
||||
*/
|
||||
public TimeValue getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeBoolean(waitForRemoval);
|
||||
out.writeTimeValue(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ClearVotingTombstonesRequest{" +
|
||||
", waitForRemoval=" + waitForRemoval +
|
||||
", timeout=" + timeout +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A response to {@link ClearVotingTombstonesRequest} indicating that voting tombstones have been cleared from the cluster state.
|
||||
*/
|
||||
public class ClearVotingTombstonesResponse extends ActionResponse {
|
||||
public ClearVotingTombstonesResponse() {
|
||||
}
|
||||
|
||||
public ClearVotingTombstonesResponse(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterState.Builder;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver.Listener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class TransportAddVotingTombstonesAction extends TransportMasterNodeAction<AddVotingTombstonesRequest, AddVotingTombstonesResponse> {
|
||||
|
||||
public static final Setting<Integer> MAXIMUM_VOTING_TOMBSTONES_SETTING
|
||||
= Setting.intSetting("cluster.max_voting_tombstones", 10, 1, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
@Inject
|
||||
public TransportAddVotingTombstonesAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(AddVotingTombstonesAction.NAME, transportService, clusterService, threadPool, actionFilters, AddVotingTombstonesRequest::new,
|
||||
indexNameExpressionResolver);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AddVotingTombstonesResponse newResponse() {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AddVotingTombstonesResponse read(StreamInput in) throws IOException {
|
||||
return new AddVotingTombstonesResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(AddVotingTombstonesRequest request, ClusterState state,
|
||||
ActionListener<AddVotingTombstonesResponse> listener) throws Exception {
|
||||
|
||||
resolveNodesAndCheckMaximum(request, state); // throws IllegalArgumentException if no nodes matched or maximum exceeded
|
||||
|
||||
clusterService.submitStateUpdateTask("add-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
|
||||
private Set<DiscoveryNode> resolvedNodes;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
assert resolvedNodes == null : resolvedNodes;
|
||||
resolvedNodes = resolveNodesAndCheckMaximum(request, currentState);
|
||||
|
||||
final Builder builder = ClusterState.builder(currentState);
|
||||
resolvedNodes.forEach(builder::addVotingTombstone);
|
||||
final ClusterState newState = builder.build();
|
||||
assert newState.getVotingTombstones().size() <= MAXIMUM_VOTING_TOMBSTONES_SETTING.get(currentState.metaData().settings());
|
||||
return newState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
|
||||
final ClusterStateObserver observer
|
||||
= new ClusterStateObserver(clusterService, request.getTimeout(), logger, threadPool.getThreadContext());
|
||||
|
||||
final Set<String> resolvedNodeIds = resolvedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
|
||||
|
||||
final Predicate<ClusterState> allNodesRemoved = clusterState -> {
|
||||
final Set<String> votingNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds();
|
||||
return resolvedNodeIds.stream().noneMatch(votingNodeIds::contains);
|
||||
};
|
||||
|
||||
final Listener clusterStateListener = new Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
listener.onResponse(new AddVotingTombstonesResponse());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new ElasticsearchException("cluster service closed while waiting for withdrawal of votes from "
|
||||
+ resolvedNodes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for withdrawal of votes from "
|
||||
+ resolvedNodes));
|
||||
}
|
||||
};
|
||||
|
||||
if (allNodesRemoved.test(newState)) {
|
||||
clusterStateListener.onNewClusterState(newState);
|
||||
} else {
|
||||
observer.waitForNextChange(clusterStateListener, allNodesRemoved);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static Set<DiscoveryNode> resolveNodesAndCheckMaximum(AddVotingTombstonesRequest request, ClusterState state) {
|
||||
return request.resolveNodesAndCheckMaximum(state,
|
||||
MAXIMUM_VOTING_TOMBSTONES_SETTING.get(state.metaData().settings()), MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(AddVotingTombstonesRequest request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterState.Builder;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver.Listener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
public class TransportClearVotingTombstonesAction
|
||||
extends TransportMasterNodeAction<ClearVotingTombstonesRequest, ClearVotingTombstonesResponse> {
|
||||
|
||||
@Inject
|
||||
public TransportClearVotingTombstonesAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(ClearVotingTombstonesAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
ClearVotingTombstonesRequest::new, indexNameExpressionResolver);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
return Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClearVotingTombstonesResponse newResponse() {
|
||||
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClearVotingTombstonesResponse read(StreamInput in) throws IOException {
|
||||
return new ClearVotingTombstonesResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(ClearVotingTombstonesRequest request, ClusterState initialState,
|
||||
ActionListener<ClearVotingTombstonesResponse> listener) throws Exception {
|
||||
|
||||
final long startTimeMillis = threadPool.relativeTimeInMillis();
|
||||
|
||||
final Predicate<ClusterState> allTombstonedNodesRemoved = newState -> {
|
||||
for (DiscoveryNode tombstone : initialState.getVotingTombstones()) {
|
||||
// NB checking for the existence of any node with this persistent ID, because persistent IDs are how votes are counted.
|
||||
// Calling nodeExists(tombstone) is insufficient because this compares on the ephemeral ID.
|
||||
if (newState.nodes().nodeExists(tombstone.getId())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
if (request.getWaitForRemoval() && allTombstonedNodesRemoved.test(initialState) == false) {
|
||||
final ClusterStateObserver clusterStateObserver = new ClusterStateObserver(initialState, clusterService, request.getTimeout(),
|
||||
logger, threadPool.getThreadContext());
|
||||
|
||||
clusterStateObserver.waitForNextChange(new Listener() {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
submitClearTombstonesTask(request, startTimeMillis, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
listener.onFailure(new ElasticsearchException("cluster service closed while waiting for removal of nodes "
|
||||
+ initialState.getVotingTombstones()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
listener.onFailure(new ElasticsearchTimeoutException(
|
||||
"timed out waiting for removal of nodes; if nodes should not be removed, set waitForRemoval to false. "
|
||||
+ initialState.getVotingTombstones()));
|
||||
}
|
||||
}, allTombstonedNodesRemoved);
|
||||
} else {
|
||||
submitClearTombstonesTask(request, startTimeMillis, listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void submitClearTombstonesTask(ClearVotingTombstonesRequest request, long startTimeMillis,
|
||||
ActionListener<ClearVotingTombstonesResponse> listener) {
|
||||
clusterService.submitStateUpdateTask("clear-voting-tombstones", new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
final Builder builder = ClusterState.builder(currentState);
|
||||
builder.clearVotingTombstones();
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return TimeValue.timeValueMillis(request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(new ClearVotingTombstonesResponse());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(ClearVotingTombstonesRequest request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
}
|
||||
}
|
|
@ -62,6 +62,7 @@ import org.elasticsearch.discovery.Discovery;
|
|||
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
|
@ -72,6 +73,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Represents the current state of the cluster.
|
||||
|
@ -186,18 +188,21 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
|
||||
private final VotingConfiguration lastAcceptedConfiguration;
|
||||
|
||||
private final Set<DiscoveryNode> votingTombstones;
|
||||
|
||||
// built on demand
|
||||
private volatile RoutingNodes routingNodes;
|
||||
|
||||
public ClusterState(long term, long version, String stateUUID, ClusterState state) {
|
||||
this(state.clusterName, term, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(),
|
||||
state.customs(), state.getLastCommittedConfiguration(), state.getLastAcceptedConfiguration(), false);
|
||||
state.customs(), state.getLastCommittedConfiguration(), state.getLastAcceptedConfiguration(), state.getVotingTombstones(),
|
||||
false);
|
||||
}
|
||||
|
||||
public ClusterState(ClusterName clusterName, long term, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
|
||||
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
|
||||
VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
|
||||
boolean wasReadFromDiff) {
|
||||
Set<DiscoveryNode> votingTombstones, boolean wasReadFromDiff) {
|
||||
this.term = term;
|
||||
this.version = version;
|
||||
this.stateUUID = stateUUID;
|
||||
|
@ -209,6 +214,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
this.customs = customs;
|
||||
this.lastCommittedConfiguration = lastCommittedConfiguration;
|
||||
this.lastAcceptedConfiguration = lastAcceptedConfiguration;
|
||||
this.votingTombstones = votingTombstones;
|
||||
this.wasReadFromDiff = wasReadFromDiff;
|
||||
}
|
||||
|
||||
|
@ -288,6 +294,10 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
return lastCommittedConfiguration;
|
||||
}
|
||||
|
||||
public Set<DiscoveryNode> getVotingTombstones() {
|
||||
return votingTombstones;
|
||||
}
|
||||
|
||||
// Used for testing and logging to determine how this cluster state was send over the wire
|
||||
public boolean wasReadFromDiff() {
|
||||
return wasReadFromDiff;
|
||||
|
@ -313,6 +323,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
sb.append("state uuid: ").append(stateUUID).append("\n");
|
||||
sb.append("last committed config: ").append(getLastCommittedConfiguration()).append("\n");
|
||||
sb.append("last accepted config: ").append(getLastAcceptedConfiguration()).append("\n");
|
||||
sb.append("voting tombstones: ").append(votingTombstones).append("\n");
|
||||
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
|
||||
sb.append("meta data version: ").append(metaData.version()).append("\n");
|
||||
final String TAB = " ";
|
||||
|
@ -428,6 +439,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
builder.field("state_uuid", stateUUID);
|
||||
builder.field("last_committed_config", lastCommittedConfiguration);
|
||||
builder.field("last_accepted_config", lastAcceptedConfiguration);
|
||||
// TODO include voting tombstones here
|
||||
}
|
||||
|
||||
if (metrics.contains(Metric.MASTER_NODE)) {
|
||||
|
@ -637,6 +649,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
|
||||
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
|
||||
private final ImmutableOpenMap.Builder<String, Custom> customs;
|
||||
private final Set<DiscoveryNode> votingTombstones = new HashSet<>();
|
||||
private boolean fromDiff;
|
||||
|
||||
|
||||
|
@ -647,6 +660,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
this.uuid = state.stateUUID();
|
||||
this.lastCommittedConfiguration = state.getLastCommittedConfiguration();
|
||||
this.lastAcceptedConfiguration = state.getLastAcceptedConfiguration();
|
||||
this.votingTombstones.addAll(state.getVotingTombstones());
|
||||
this.nodes = state.nodes();
|
||||
this.routingTable = state.routingTable();
|
||||
this.metaData = state.metaData();
|
||||
|
@ -752,7 +766,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
uuid = UUIDs.randomBase64UUID();
|
||||
}
|
||||
return new ClusterState(clusterName, term, version, uuid, metaData, routingTable, nodes, blocks, customs.build(),
|
||||
lastCommittedConfiguration, lastAcceptedConfiguration, fromDiff);
|
||||
lastCommittedConfiguration, lastAcceptedConfiguration, Collections.unmodifiableSet(new HashSet<>(votingTombstones)),
|
||||
fromDiff);
|
||||
}
|
||||
|
||||
public static byte[] toBytes(ClusterState state) throws IOException {
|
||||
|
@ -770,6 +785,14 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
return readFrom(in, localNode);
|
||||
|
||||
}
|
||||
|
||||
public void addVotingTombstone(DiscoveryNode tombstone) {
|
||||
votingTombstones.add(tombstone);
|
||||
}
|
||||
|
||||
public void clearVotingTombstones() {
|
||||
votingTombstones.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -792,6 +815,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
builder.lastCommittedConfiguration(new VotingConfiguration(in));
|
||||
builder.lastAcceptedConfiguration(new VotingConfiguration(in));
|
||||
in.readSet(DiscoveryNode::new).forEach(builder::addVotingTombstone);
|
||||
}
|
||||
builder.metaData = MetaData.readFrom(in);
|
||||
builder.routingTable = RoutingTable.readFrom(in);
|
||||
|
@ -816,6 +840,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
lastCommittedConfiguration.writeTo(out);
|
||||
lastAcceptedConfiguration.writeTo(out);
|
||||
out.writeCollection(votingTombstones, (o, v) -> v.writeTo(o));
|
||||
}
|
||||
metaData.writeTo(out);
|
||||
routingTable.writeTo(out);
|
||||
|
@ -852,6 +877,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
|
||||
private final VotingConfiguration lastAcceptedConfiguration;
|
||||
|
||||
private final Set<DiscoveryNode> votingTombstones;
|
||||
|
||||
private final Diff<RoutingTable> routingTable;
|
||||
|
||||
private final Diff<DiscoveryNodes> nodes;
|
||||
|
@ -870,6 +897,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
clusterName = after.clusterName;
|
||||
lastCommittedConfiguration = after.lastCommittedConfiguration;
|
||||
lastAcceptedConfiguration = after.lastAcceptedConfiguration;
|
||||
votingTombstones = after.votingTombstones;
|
||||
routingTable = after.routingTable.diff(before.routingTable);
|
||||
nodes = after.nodes.diff(before.nodes);
|
||||
metaData = after.metaData.diff(before.metaData);
|
||||
|
@ -890,9 +918,11 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
lastCommittedConfiguration = new VotingConfiguration(in);
|
||||
lastAcceptedConfiguration = new VotingConfiguration(in);
|
||||
votingTombstones = in.readSet(DiscoveryNode::new);
|
||||
} else {
|
||||
lastCommittedConfiguration = VotingConfiguration.EMPTY_CONFIG;
|
||||
lastAcceptedConfiguration = VotingConfiguration.EMPTY_CONFIG;
|
||||
votingTombstones = Collections.emptySet();
|
||||
}
|
||||
routingTable = RoutingTable.readDiffFrom(in);
|
||||
nodes = DiscoveryNodes.readDiffFrom(in, localNode);
|
||||
|
@ -913,6 +943,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
lastCommittedConfiguration.writeTo(out);
|
||||
lastAcceptedConfiguration.writeTo(out);
|
||||
out.writeCollection(votingTombstones, (o, v) -> v.writeTo(o));
|
||||
}
|
||||
routingTable.writeTo(out);
|
||||
nodes.writeTo(out);
|
||||
|
@ -936,6 +967,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
builder.version(toVersion);
|
||||
builder.lastCommittedConfiguration(lastCommittedConfiguration);
|
||||
builder.lastAcceptedConfiguration(lastAcceptedConfiguration);
|
||||
votingTombstones.forEach(builder::addVotingTombstone);
|
||||
builder.routingTable(routingTable.apply(state.routingTable));
|
||||
builder.nodes(nodes.apply(state.nodes));
|
||||
builder.metaData(metaData.apply(state.metaData));
|
||||
|
@ -1009,5 +1041,10 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
|
|||
}
|
||||
return builder.endArray();
|
||||
}
|
||||
|
||||
public static VotingConfiguration of(DiscoveryNode... nodes) {
|
||||
// this could be used in many more places - TODO use this where appropriate
|
||||
return new VotingConfiguration(Arrays.stream(nodes).map(DiscoveryNode::getId).collect(Collectors.toSet()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,7 +76,6 @@ import java.util.function.Supplier;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
|
||||
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
|
||||
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
|
||||
|
@ -642,8 +641,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
|
||||
final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
|
||||
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
|
||||
final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure(
|
||||
liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration());
|
||||
final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure(liveNodes,
|
||||
clusterState.getVotingTombstones().stream().map(DiscoveryNode::getId).collect(Collectors.toSet()),
|
||||
clusterState.getLastAcceptedConfiguration());
|
||||
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
|
||||
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
|
||||
return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build();
|
||||
|
|
|
@ -79,7 +79,8 @@ public abstract class AbstractScopedSettings extends AbstractComponent {
|
|||
Map<String, Setting<?>> keySettings = new HashMap<>();
|
||||
for (Setting<?> setting : settingsSet) {
|
||||
if (setting.getProperties().contains(scope) == false) {
|
||||
throw new IllegalArgumentException("Setting must be a " + scope + " setting but has: " + setting.getProperties());
|
||||
throw new IllegalArgumentException("Setting " + setting + " must be a "
|
||||
+ scope + " setting but has: " + setting.getProperties());
|
||||
}
|
||||
validateSettingKey(setting);
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.common.settings;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction;
|
||||
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
|
||||
import org.elasticsearch.action.search.TransportSearchAction;
|
||||
import org.elasticsearch.action.support.AutoCreateIndex;
|
||||
|
@ -457,7 +458,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
ElectionSchedulerFactory.ELECTION_DURATION_SETTING,
|
||||
Coordinator.PUBLISH_TIMEOUT_SETTING,
|
||||
JoinHelper.JOIN_TIMEOUT_SETTING,
|
||||
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION
|
||||
Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION,
|
||||
TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING
|
||||
)));
|
||||
|
||||
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
|
||||
|
|
|
@ -62,7 +62,8 @@ import static org.mockito.Mockito.verifyZeroInteractions;
|
|||
|
||||
public class TransportBootstrapClusterActionTests extends ESTestCase {
|
||||
|
||||
private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());
|
||||
private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());
|
||||
|
||||
private DiscoveryNode discoveryNode;
|
||||
private static ThreadPool threadPool;
|
||||
private TransportService transportService;
|
||||
|
|
|
@ -46,8 +46,9 @@ import org.elasticsearch.transport.TransportRequest;
|
|||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.TransportService.HandshakeResponse;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
@ -69,14 +70,25 @@ import static org.mockito.Mockito.verifyZeroInteractions;
|
|||
|
||||
public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
||||
|
||||
private final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());
|
||||
private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());
|
||||
|
||||
private static ThreadPool threadPool;
|
||||
private DiscoveryNode localNode;
|
||||
private ThreadPool threadPool;
|
||||
private String clusterName;
|
||||
private TransportService transportService;
|
||||
private Coordinator coordinator;
|
||||
private DiscoveryNode otherNode;
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPool() {
|
||||
threadPool = new TestThreadPool("test", Settings.EMPTY);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownThreadPool() {
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupTest() {
|
||||
clusterName = randomAlphaOfLength(10);
|
||||
|
@ -91,7 +103,6 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
};
|
||||
threadPool = new TestThreadPool("test", Settings.EMPTY);
|
||||
transportService = transport.createTransportService(
|
||||
Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool,
|
||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
|
||||
|
@ -104,17 +115,11 @@ public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
|
|||
new NoOpClusterApplier(), new Random(random().nextLong()));
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException {
|
||||
final Discovery discovery = mock(Discovery.class);
|
||||
verifyZeroInteractions(discovery);
|
||||
|
||||
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, discovery); // registers action
|
||||
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class AddVotingTombstonesRequestTests extends ESTestCase {
|
||||
public void testSerialization() throws IOException {
|
||||
int descriptionCount = between(0, 5);
|
||||
String[] descriptions = new String[descriptionCount];
|
||||
for (int i = 0; i < descriptionCount; i++) {
|
||||
descriptions[i] = randomAlphaOfLength(10);
|
||||
}
|
||||
TimeValue timeout = TimeValue.timeValueMillis(between(0, 30000));
|
||||
final AddVotingTombstonesRequest originalRequest = new AddVotingTombstonesRequest(descriptions, timeout);
|
||||
final AddVotingTombstonesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesRequest::new);
|
||||
assertThat(deserialized.getNodeDescriptions(), equalTo(originalRequest.getNodeDescriptions()));
|
||||
assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout()));
|
||||
}
|
||||
|
||||
public void testResolve() {
|
||||
final DiscoveryNode localNode
|
||||
= new DiscoveryNode("local", "local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
|
||||
final DiscoveryNode otherNode1
|
||||
= new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
|
||||
final DiscoveryNode otherNode2
|
||||
= new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
|
||||
final DiscoveryNode otherDataNode
|
||||
= new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
|
||||
final ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")).nodes(new Builder()
|
||||
.add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode).localNodeId(localNode.getId())).build();
|
||||
|
||||
assertThat(makeRequest().resolveNodes(clusterState), containsInAnyOrder(localNode, otherNode1, otherNode2));
|
||||
assertThat(makeRequest("_all").resolveNodes(clusterState), containsInAnyOrder(localNode, otherNode1, otherNode2));
|
||||
assertThat(makeRequest("_local").resolveNodes(clusterState), contains(localNode));
|
||||
assertThat(makeRequest("other*").resolveNodes(clusterState), containsInAnyOrder(otherNode1, otherNode2));
|
||||
|
||||
assertThat(expectThrows(IllegalArgumentException.class, () -> makeRequest("not-a-node").resolveNodes(clusterState)).getMessage(),
|
||||
equalTo("add voting tombstones request for [not-a-node] matched no master-eligible nodes"));
|
||||
}
|
||||
|
||||
public void testResolveAndCheckMaximum() {
|
||||
final DiscoveryNode localNode
|
||||
= new DiscoveryNode("local", "local", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
|
||||
final DiscoveryNode otherNode1
|
||||
= new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
|
||||
final DiscoveryNode otherNode2
|
||||
= new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
|
||||
|
||||
final ClusterState.Builder builder = ClusterState.builder(new ClusterName("cluster")).nodes(new Builder()
|
||||
.add(localNode).add(otherNode1).add(otherNode2).localNodeId(localNode.getId()));
|
||||
builder.addVotingTombstone(otherNode1);
|
||||
final ClusterState clusterState = builder.build();
|
||||
|
||||
assertThat(makeRequest().resolveNodesAndCheckMaximum(clusterState, 3, "setting.name"),
|
||||
containsInAnyOrder(localNode, otherNode2));
|
||||
assertThat(makeRequest("_local").resolveNodesAndCheckMaximum(clusterState, 2, "setting.name"),
|
||||
contains(localNode));
|
||||
|
||||
assertThat(expectThrows(IllegalArgumentException.class,
|
||||
() -> makeRequest().resolveNodesAndCheckMaximum(clusterState, 2, "setting.name")).getMessage(),
|
||||
equalTo("add voting tombstones request for [] would add [2] voting tombstones to the existing [1] which would exceed the " +
|
||||
"maximum of [2] set by [setting.name]"));
|
||||
assertThat(expectThrows(IllegalArgumentException.class,
|
||||
() -> makeRequest("_local").resolveNodesAndCheckMaximum(clusterState, 1, "setting.name")).getMessage(),
|
||||
equalTo("add voting tombstones request for [_local] would add [1] voting tombstones to the existing [1] which would exceed " +
|
||||
"the maximum of [1] set by [setting.name]"));
|
||||
}
|
||||
|
||||
private static AddVotingTombstonesRequest makeRequest(String... descriptions) {
|
||||
return new AddVotingTombstonesRequest(descriptions);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class AddVotingTombstonesResponseTests extends ESTestCase {
|
||||
public void testSerialization() throws IOException {
|
||||
final AddVotingTombstonesResponse originalRequest = new AddVotingTombstonesResponse();
|
||||
copyWriteable(originalRequest, writableRegistry(), AddVotingTombstonesResponse::new);
|
||||
// there are no fields so we're just checking that this doesn't throw anything
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class ClearVotingTombstonesRequestTests extends ESTestCase {
|
||||
public void testSerialization() throws IOException {
|
||||
final ClearVotingTombstonesRequest originalRequest = new ClearVotingTombstonesRequest();
|
||||
if (randomBoolean()) {
|
||||
originalRequest.setWaitForRemoval(randomBoolean());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
originalRequest.setTimeout(TimeValue.timeValueMillis(randomLongBetween(0, 30000)));
|
||||
}
|
||||
final ClearVotingTombstonesRequest deserialized
|
||||
= copyWriteable(originalRequest, writableRegistry(), ClearVotingTombstonesRequest::new);
|
||||
assertThat(deserialized.getWaitForRemoval(), equalTo(originalRequest.getWaitForRemoval()));
|
||||
assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ClearVotingTombstonesResponseTests extends ESTestCase {
|
||||
public void testSerialization() throws IOException {
|
||||
final ClearVotingTombstonesResponse originalRequest = new ClearVotingTombstonesResponse();
|
||||
copyWriteable(originalRequest, writableRegistry(), ClearVotingTombstonesResponse::new);
|
||||
// there are no fields so we're just checking that this doesn't throw anything
|
||||
}
|
||||
}
|
|
@ -0,0 +1,405 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver;
|
||||
import org.elasticsearch.cluster.ClusterStateObserver.Listener;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingTombstonesAction.MAXIMUM_VOTING_TOMBSTONES_SETTING;
|
||||
import static org.elasticsearch.cluster.ClusterState.builder;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class TransportAddVotingTombstonesActionTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool threadPool;
|
||||
private static ClusterService clusterService;
|
||||
private static DiscoveryNode localNode, otherNode1, otherNode2, otherDataNode;
|
||||
|
||||
private TransportService transportService;
|
||||
private ClusterStateObserver clusterStateObserver;
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPoolAndClusterService() {
|
||||
threadPool = new TestThreadPool("test", Settings.EMPTY);
|
||||
localNode = makeDiscoveryNode("local");
|
||||
otherNode1 = makeDiscoveryNode("other1");
|
||||
otherNode2 = makeDiscoveryNode("other2");
|
||||
otherDataNode = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
clusterService = createClusterService(threadPool, localNode);
|
||||
}
|
||||
|
||||
private static DiscoveryNode makeDiscoveryNode(String name) {
|
||||
return new DiscoveryNode(name, name, buildNewFakeTransportAddress(), emptyMap(), singleton(Role.MASTER), Version.CURRENT);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownThreadPoolAndClusterService() {
|
||||
clusterService.stop();
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupForTest() {
|
||||
final MockTransport transport = new MockTransport();
|
||||
transportService = transport.createTransportService(Settings.EMPTY, threadPool,
|
||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
|
||||
|
||||
new TransportAddVotingTombstonesAction(transportService, clusterService, threadPool, new ActionFilters(emptySet()),
|
||||
new IndexNameExpressionResolver()); // registers action
|
||||
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
||||
final VotingConfiguration allNodesConfig = VotingConfiguration.of(localNode, otherNode1, otherNode2);
|
||||
|
||||
setState(clusterService, builder(new ClusterName("cluster"))
|
||||
.nodes(new Builder().add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode)
|
||||
.localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
|
||||
.lastAcceptedConfiguration(allNodesConfig).lastCommittedConfiguration(allNodesConfig));
|
||||
|
||||
clusterStateObserver = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
|
||||
}
|
||||
|
||||
public void testWithdrawsVoteFromANode() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones());
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"other1"}),
|
||||
expectSuccess(r -> {
|
||||
assertNotNull(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1));
|
||||
}
|
||||
|
||||
public void testWithdrawsVotesFromMultipleNodes() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones());
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"other1", "other2"}),
|
||||
expectSuccess(r -> {
|
||||
assertNotNull(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2));
|
||||
}
|
||||
|
||||
public void testWithdrawsVotesFromNodesMatchingWildcard() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones());
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"other*"}),
|
||||
expectSuccess(r -> {
|
||||
assertNotNull(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2));
|
||||
}
|
||||
|
||||
public void testWithdrawsVotesFromAllMasterEligibleNodes() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones());
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"_all"}),
|
||||
expectSuccess(r -> {
|
||||
assertNotNull(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(),
|
||||
containsInAnyOrder(localNode, otherNode1, otherNode2));
|
||||
}
|
||||
|
||||
public void testWithdrawsVoteFromLocalNode() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
clusterStateObserver.waitForNextChange(new AdjustConfigurationForTombstones());
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"_local"}),
|
||||
expectSuccess(r -> {
|
||||
assertNotNull(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(localNode));
|
||||
}
|
||||
|
||||
public void testReturnsImmediatelyIfVoteAlreadyWithdrawn() throws InterruptedException {
|
||||
setState(clusterService, builder(clusterService.state())
|
||||
.lastCommittedConfiguration(VotingConfiguration.of(localNode, otherNode2))
|
||||
.lastAcceptedConfiguration(VotingConfiguration.of(localNode, otherNode2)));
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
// no observer to reconfigure
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"other1"}, TimeValue.ZERO),
|
||||
expectSuccess(r -> {
|
||||
assertNotNull(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1));
|
||||
}
|
||||
|
||||
public void testReturnsErrorIfNoMatchingNodes() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final SetOnce<TransportException> exceptionHolder = new SetOnce<>();
|
||||
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"not-a-node"}),
|
||||
expectError(e -> {
|
||||
exceptionHolder.set(e);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
final Throwable rootCause = exceptionHolder.get().getRootCause();
|
||||
assertThat(rootCause, instanceOf(IllegalArgumentException.class));
|
||||
assertThat(rootCause.getMessage(), equalTo("add voting tombstones request for [not-a-node] matched no master-eligible nodes"));
|
||||
}
|
||||
|
||||
public void testOnlyMatchesMasterEligibleNodes() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final SetOnce<TransportException> exceptionHolder = new SetOnce<>();
|
||||
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"_all", "master:false"}),
|
||||
expectError(e -> {
|
||||
exceptionHolder.set(e);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
final Throwable rootCause = exceptionHolder.get().getRootCause();
|
||||
assertThat(rootCause, instanceOf(IllegalArgumentException.class));
|
||||
assertThat(rootCause.getMessage(),
|
||||
equalTo("add voting tombstones request for [_all, master:false] matched no master-eligible nodes"));
|
||||
}
|
||||
|
||||
public void testSucceedsEvenIfAllTombstonesAlreadyAdded() throws InterruptedException {
|
||||
final ClusterState.Builder builder = builder(clusterService.state());
|
||||
builder.addVotingTombstone(otherNode1);
|
||||
setState(clusterService, builder);
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"other1"}),
|
||||
expectSuccess(r -> {
|
||||
assertNotNull(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), contains(otherNode1));
|
||||
}
|
||||
|
||||
public void testReturnsErrorIfMaximumTombstoneCountExceeded() throws InterruptedException {
|
||||
final ClusterState.Builder builder = builder(clusterService.state())
|
||||
.metaData(MetaData.builder(clusterService.state().metaData()).persistentSettings(
|
||||
Settings.builder().put(clusterService.state().metaData().persistentSettings())
|
||||
.put(MAXIMUM_VOTING_TOMBSTONES_SETTING.getKey(), 2).build()));
|
||||
builder.addVotingTombstone(localNode);
|
||||
final int existingCount, newCount;
|
||||
if (randomBoolean()) {
|
||||
builder.addVotingTombstone(otherNode1);
|
||||
existingCount = 2;
|
||||
newCount = 1;
|
||||
} else {
|
||||
existingCount = 1;
|
||||
newCount = 2;
|
||||
}
|
||||
setState(clusterService, builder);
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final SetOnce<TransportException> exceptionHolder = new SetOnce<>();
|
||||
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"other*"}),
|
||||
expectError(e -> {
|
||||
exceptionHolder.set(e);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
final Throwable rootCause = exceptionHolder.get().getRootCause();
|
||||
assertThat(rootCause, instanceOf(IllegalArgumentException.class));
|
||||
assertThat(rootCause.getMessage(), equalTo("add voting tombstones request for [other*] would add [" + newCount +
|
||||
"] voting tombstones to the existing [" + existingCount +
|
||||
"] which would exceed the maximum of [2] set by [cluster.max_voting_tombstones]"));
|
||||
}
|
||||
|
||||
public void testTimesOut() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final SetOnce<TransportException> exceptionHolder = new SetOnce<>();
|
||||
|
||||
transportService.sendRequest(localNode, AddVotingTombstonesAction.NAME,
|
||||
new AddVotingTombstonesRequest(new String[]{"other1"}, TimeValue.timeValueMillis(100)),
|
||||
expectError(e -> {
|
||||
exceptionHolder.set(e);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
final Throwable rootCause = exceptionHolder.get().getRootCause();
|
||||
assertThat(rootCause,instanceOf(ElasticsearchTimeoutException.class));
|
||||
assertThat(rootCause.getMessage(), startsWith("timed out waiting for withdrawal of votes from [{other1}"));
|
||||
}
|
||||
|
||||
private TransportResponseHandler<AddVotingTombstonesResponse> expectSuccess(Consumer<AddVotingTombstonesResponse> onResponse) {
|
||||
return responseHandler(onResponse, e -> {
|
||||
throw new AssertionError("unexpected", e);
|
||||
});
|
||||
}
|
||||
|
||||
private TransportResponseHandler<AddVotingTombstonesResponse> expectError(Consumer<TransportException> onException) {
|
||||
return responseHandler(r -> {
|
||||
assert false : r;
|
||||
}, onException);
|
||||
}
|
||||
|
||||
private TransportResponseHandler<AddVotingTombstonesResponse> responseHandler(Consumer<AddVotingTombstonesResponse> onResponse,
|
||||
Consumer<TransportException> onException) {
|
||||
return new TransportResponseHandler<AddVotingTombstonesResponse>() {
|
||||
@Override
|
||||
public void handleResponse(AddVotingTombstonesResponse response) {
|
||||
onResponse.accept(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
onException.accept(exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddVotingTombstonesResponse read(StreamInput in) throws IOException {
|
||||
return new AddVotingTombstonesResponse(in);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private class AdjustConfigurationForTombstones implements Listener {
|
||||
@Override
|
||||
public void onNewClusterState(ClusterState state) {
|
||||
clusterService.getMasterService().submitStateUpdateTask("reconfiguration", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
assertThat(currentState, sameInstance(state));
|
||||
final Set<String> votingNodeIds = new HashSet<>();
|
||||
currentState.nodes().forEach(n -> votingNodeIds.add(n.getId()));
|
||||
currentState.getVotingTombstones().forEach(t -> votingNodeIds.remove(t.getId()));
|
||||
final VotingConfiguration votingConfiguration = new VotingConfiguration(votingNodeIds);
|
||||
return builder(currentState)
|
||||
.lastAcceptedConfiguration(votingConfiguration)
|
||||
.lastCommittedConfiguration(votingConfiguration).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
throw new AssertionError("unexpected failure", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClusterServiceClose() {
|
||||
throw new AssertionError("unexpected close");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(TimeValue timeout) {
|
||||
throw new AssertionError("unexpected timeout");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,202 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.cluster.configuration;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransport;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool.Names;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.elasticsearch.cluster.ClusterState.builder;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class TransportClearVotingTombstonesActionTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool threadPool;
|
||||
private static ClusterService clusterService;
|
||||
private static DiscoveryNode localNode, otherNode1, otherNode2;
|
||||
|
||||
private TransportService transportService;
|
||||
|
||||
@BeforeClass
|
||||
public static void createThreadPoolAndClusterService() {
|
||||
threadPool = new TestThreadPool("test", Settings.EMPTY);
|
||||
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
otherNode1 = new DiscoveryNode("other1", "other1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
otherNode2 = new DiscoveryNode("other2", "other2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
clusterService = createClusterService(threadPool, localNode);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownThreadPoolAndClusterService() {
|
||||
clusterService.stop();
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupForTest() {
|
||||
final MockTransport transport = new MockTransport();
|
||||
transportService = transport.createTransportService(Settings.EMPTY, threadPool,
|
||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
|
||||
|
||||
new TransportClearVotingTombstonesAction(transportService, clusterService, threadPool, new ActionFilters(emptySet()),
|
||||
new IndexNameExpressionResolver()); // registers action
|
||||
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
|
||||
final ClusterState.Builder builder = builder(new ClusterName("cluster"))
|
||||
.nodes(new Builder().add(localNode).add(otherNode1).add(otherNode2)
|
||||
.localNodeId(localNode.getId()).masterNodeId(localNode.getId()));
|
||||
builder.addVotingTombstone(otherNode1);
|
||||
builder.addVotingTombstone(otherNode2);
|
||||
setState(clusterService, builder);
|
||||
}
|
||||
|
||||
public void testClearsVotingTombstones() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final SetOnce<ClearVotingTombstonesResponse> responseHolder = new SetOnce<>();
|
||||
|
||||
final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest();
|
||||
clearVotingTombstonesRequest.setWaitForRemoval(false);
|
||||
transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME,
|
||||
clearVotingTombstonesRequest,
|
||||
expectSuccess(r -> {
|
||||
responseHolder.set(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertNotNull(responseHolder.get());
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), empty());
|
||||
}
|
||||
|
||||
public void testTimesOutIfWaitingForNodesThatAreNotRemoved() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final SetOnce<TransportException> responseHolder = new SetOnce<>();
|
||||
|
||||
final ClearVotingTombstonesRequest clearVotingTombstonesRequest = new ClearVotingTombstonesRequest();
|
||||
clearVotingTombstonesRequest.setTimeout(TimeValue.timeValueMillis(100));
|
||||
transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME,
|
||||
clearVotingTombstonesRequest,
|
||||
expectError(e -> {
|
||||
responseHolder.set(e);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), containsInAnyOrder(otherNode1, otherNode2));
|
||||
final Throwable rootCause = responseHolder.get().getRootCause();
|
||||
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
|
||||
assertThat(rootCause.getMessage(),
|
||||
startsWith("timed out waiting for removal of nodes; if nodes should not be removed, set waitForRemoval to false. ["));
|
||||
}
|
||||
|
||||
public void testSucceedsIfNodesAreRemovedWhileWaiting() throws InterruptedException {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final SetOnce<ClearVotingTombstonesResponse> responseHolder = new SetOnce<>();
|
||||
|
||||
transportService.sendRequest(localNode, ClearVotingTombstonesAction.NAME,
|
||||
new ClearVotingTombstonesRequest(),
|
||||
expectSuccess(r -> {
|
||||
responseHolder.set(r);
|
||||
countDownLatch.countDown();
|
||||
})
|
||||
);
|
||||
|
||||
final ClusterState.Builder builder = builder(clusterService.state());
|
||||
builder.nodes(DiscoveryNodes.builder(clusterService.state().nodes()).remove(otherNode1).remove(otherNode2));
|
||||
setState(clusterService, builder);
|
||||
|
||||
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
|
||||
assertThat(clusterService.getClusterApplierService().state().getVotingTombstones(), empty());
|
||||
}
|
||||
|
||||
private TransportResponseHandler<ClearVotingTombstonesResponse> expectSuccess(Consumer<ClearVotingTombstonesResponse> onResponse) {
|
||||
return responseHandler(onResponse, e -> {
|
||||
throw new AssertionError("unexpected", e);
|
||||
});
|
||||
}
|
||||
|
||||
private TransportResponseHandler<ClearVotingTombstonesResponse> expectError(Consumer<TransportException> onException) {
|
||||
return responseHandler(r -> {
|
||||
assert false : r;
|
||||
}, onException);
|
||||
}
|
||||
|
||||
private TransportResponseHandler<ClearVotingTombstonesResponse> responseHandler(Consumer<ClearVotingTombstonesResponse> onResponse,
|
||||
Consumer<TransportException> onException) {
|
||||
return new TransportResponseHandler<ClearVotingTombstonesResponse>() {
|
||||
@Override
|
||||
public void handleResponse(ClearVotingTombstonesResponse response) {
|
||||
onResponse.accept(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
onException.accept(exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClearVotingTombstonesResponse read(StreamInput in) throws IOException {
|
||||
return new ClearVotingTombstonesResponse(in);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.test.DummyShardLock;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -66,6 +67,13 @@ import static org.hamcrest.Matchers.not;
|
|||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0)
|
||||
public class AllocationIdIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class);
|
||||
|
@ -224,15 +232,17 @@ public class AllocationIdIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception {
|
||||
final ClusterAllocationExplanation explanation =
|
||||
client().admin().cluster().prepareAllocationExplain()
|
||||
.setIndex(indexName).setShard(shardId.id()).setPrimary(true)
|
||||
.get().getExplanation();
|
||||
assertBusy(() -> {
|
||||
final ClusterAllocationExplanation explanation =
|
||||
client().admin().cluster().prepareAllocationExplain()
|
||||
.setIndex(indexName).setShard(shardId.id()).setPrimary(true)
|
||||
.get().getExplanation();
|
||||
|
||||
final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
|
||||
assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
|
||||
assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
|
||||
equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
|
||||
final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision();
|
||||
assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true));
|
||||
assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(),
|
||||
equalTo(AllocationDecision.NO_VALID_SHARD_COPY));
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,11 +27,14 @@ import com.carrotsearch.randomizedtesting.SysGlobals;
|
|||
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesAction;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.AddVotingTombstonesRequest;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesAction;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingTombstonesRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
|
||||
|
@ -1622,18 +1625,51 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
|
||||
private synchronized void stopNodesAndClients(Collection<NodeAndClient> nodeAndClients) throws IOException {
|
||||
final Set<String> withdrawnNodeIds = new HashSet<>();
|
||||
|
||||
if (autoManageMinMasterNodes && nodeAndClients.size() > 0) {
|
||||
int masters = (int)nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
if (masters > 0) {
|
||||
updateMinMasterNodes(getMasterNodesCount() - masters);
|
||||
|
||||
final long currentMasters = nodes.values().stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
final long stoppingMasters = nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).count();
|
||||
|
||||
assert stoppingMasters <= currentMasters : currentMasters + " < " + stoppingMasters;
|
||||
if (stoppingMasters != currentMasters && stoppingMasters > 0) {
|
||||
// If stopping few enough master-nodes that there's still a majority left, there is no need to withdraw their votes first.
|
||||
// However, we do not yet have a way to be sure there's a majority left, because the voting configuration may not yet have
|
||||
// been updated when the previous nodes shut down, so we must always explicitly withdraw votes.
|
||||
// TODO add cluster health API to check that voting configuration is optimal so this isn't always needed
|
||||
nodeAndClients.stream().filter(NodeAndClient::isMasterEligible).map(NodeAndClient::getName).forEach(withdrawnNodeIds::add);
|
||||
assert withdrawnNodeIds.size() == stoppingMasters;
|
||||
|
||||
logger.info("withdrawing votes from {} prior to shutdown", withdrawnNodeIds);
|
||||
try {
|
||||
client().execute(AddVotingTombstonesAction.INSTANCE,
|
||||
new AddVotingTombstonesRequest(withdrawnNodeIds.toArray(new String[0]))).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (stoppingMasters > 0) {
|
||||
updateMinMasterNodes(getMasterNodesCount() - Math.toIntExact(stoppingMasters));
|
||||
}
|
||||
}
|
||||
|
||||
for (NodeAndClient nodeAndClient: nodeAndClients) {
|
||||
removeDisruptionSchemeFromNode(nodeAndClient);
|
||||
NodeAndClient previous = nodes.remove(nodeAndClient.name);
|
||||
assert previous == nodeAndClient;
|
||||
nodeAndClient.close();
|
||||
}
|
||||
|
||||
if (withdrawnNodeIds.isEmpty() == false) {
|
||||
logger.info("removing voting tombstones for {} after shutdown", withdrawnNodeIds);
|
||||
try {
|
||||
client().execute(ClearVotingTombstonesAction.INSTANCE, new ClearVotingTombstonesRequest()).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new AssertionError("unexpected", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test.test;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.discovery.TestZenDiscovery;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
@ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class InternalTestClusterIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put(TestZenDiscovery.USE_ZEN2.getKey(), true)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testStartingAndStoppingNodes() throws IOException {
|
||||
logger.info("--> cluster has [{}] nodes", internalCluster().size());
|
||||
if (internalCluster().size() < 5) {
|
||||
final int nodesToStart = randomIntBetween(Math.max(2, internalCluster().size() + 1), 5);
|
||||
logger.info("--> growing to [{}] nodes", nodesToStart);
|
||||
internalCluster().startNodes(nodesToStart);
|
||||
}
|
||||
ensureGreen();
|
||||
|
||||
while (internalCluster().size() > 1) {
|
||||
final int nodesToRemain = randomIntBetween(1, internalCluster().size() - 1);
|
||||
logger.info("--> reducing to [{}] nodes", nodesToRemain);
|
||||
internalCluster().ensureAtMostNumDataNodes(nodesToRemain);
|
||||
assertThat(internalCluster().size(), lessThanOrEqualTo(nodesToRemain));
|
||||
}
|
||||
|
||||
ensureGreen();
|
||||
}
|
||||
|
||||
public void testStoppingNodesOneByOne() throws IOException {
|
||||
// In a 5+ node cluster there must be at least one reconfiguration as the nodes are shut down one-by-one before we drop to 2 nodes.
|
||||
// If the nodes shut down too quickly then this reconfiguration does not have time to occur and the quorum is lost in the 3->2
|
||||
// transition, even though in a stable cluster the 3->2 transition requires no special treatment.
|
||||
|
||||
internalCluster().startNodes(5);
|
||||
ensureGreen();
|
||||
|
||||
while (internalCluster().size() > 1) {
|
||||
internalCluster().stopRandomNode(s -> true);
|
||||
}
|
||||
|
||||
ensureGreen();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue