Bootstrap a Zen2 cluster once quorum is discovered (#37463)

Today when bootstrapping a Zen2 cluster we wait for every node in the
`initial_master_nodes` setting to be discovered, so that we can map the
node names or addresses in the `initial_master_nodes` list to their IDs for
inclusion in the initial voting configuration. This means that if any of
the expected master-eligible nodes fails to start then bootstrapping will
not occur and the cluster will not form. This is not ideal, and we would
prefer the cluster to bootstrap even if some of the master-eligible nodes
do not start.

Safe bootstrapping requires that all pairs of quorums of all initial
configurations overlap, and this is particularly troublesome to ensure
given that nodes may be concurrently and independently attempting to
bootstrap the cluster. The solution is to bootstrap using an initial
configuration whose size matches the size of the expected set of
master-eligible nodes, but with the unknown IDs replaced by "placeholder"
IDs that can never belong to any node.  Any quorum of received votes in any
of these placeholder-laden initial configurations is also a quorum of the
"true" initial set of master-eligible nodes, giving the guarantee that it
intersects all other quorums as required.

Note that this change means that the initial configuration is not
necessarily robust to any node failures. Normally the cluster will form and
then auto-reconfigure to a more robust configuration in which the
placeholder IDs are replaced by the IDs of genuine nodes as they join the
cluster; however if a node fails between bootstrapping and this
auto-reconfiguration then the cluster may become unavailable. This we feel
to be less likely than a node failing to start at all.

This commit also enormously simplifies the cluster bootstrapping process.
Today, the cluster bootstrapping process involves two (local) transport actions
in order to support a flexible bootstrapping API and to make it easily
accessible to plugins. However this flexibility is not required for the current
design so it is adding a good deal of unnecessary complexity. Here we remove
this complexity in favour of a much simpler ClusterBootstrapService
implementation that does all the work itself.
This commit is contained in:
David Turner 2019-01-22 11:03:51 +00:00 committed by GitHub
parent e9fcb25a28
commit 5db7ed22a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 585 additions and 2470 deletions

View File

@ -1009,10 +1009,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0),
COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class,
org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0),
CLUSTER_ALREADY_BOOTSTRAPPED_EXCEPTION(org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException.class,
org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException::new, 151, Version.V_7_0_0),
SNAPSHOT_IN_PROGRESS_EXCEPTION(org.elasticsearch.snapshots.SnapshotInProgressException.class,
org.elasticsearch.snapshots.SnapshotInProgressException::new, 152, Version.V_7_0_0);
org.elasticsearch.snapshots.SnapshotInProgressException::new, 151, Version.V_7_0_0);
final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

View File

@ -23,10 +23,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.bootstrap.TransportBootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.TransportGetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
@ -433,8 +429,6 @@ public class ActionModule extends AbstractModule {
actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
actions.register(GetDiscoveredNodesAction.INSTANCE, TransportGetDiscoveredNodesAction.class);
actions.register(BootstrapClusterAction.INSTANCE, TransportBootstrapClusterAction.class);
actions.register(AddVotingConfigExclusionsAction.INSTANCE, TransportAddVotingConfigExclusionsAction.class);
actions.register(ClearVotingConfigExclusionsAction.INSTANCE, TransportClearVotingConfigExclusionsAction.class);
actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class);

View File

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
public class BootstrapClusterAction extends Action<BootstrapClusterResponse> {
public static final BootstrapClusterAction INSTANCE = new BootstrapClusterAction();
public static final String NAME = "cluster:admin/bootstrap/set_voting_config";
private BootstrapClusterAction() {
super(NAME);
}
@Override
public BootstrapClusterResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Reader<BootstrapClusterResponse> getResponseReader() {
return BootstrapClusterResponse::new;
}
}

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Request to set the initial configuration of master-eligible nodes in a cluster so that the very first master election can take place.
*/
public class BootstrapClusterRequest extends ActionRequest {
private final BootstrapConfiguration bootstrapConfiguration;
public BootstrapClusterRequest(BootstrapConfiguration bootstrapConfiguration) {
this.bootstrapConfiguration = bootstrapConfiguration;
}
public BootstrapClusterRequest(StreamInput in) throws IOException {
super(in);
bootstrapConfiguration = new BootstrapConfiguration(in);
}
/**
* @return the bootstrap configuration: the initial set of master-eligible nodes whose votes are counted in elections.
*/
public BootstrapConfiguration getBootstrapConfiguration() {
return bootstrapConfiguration;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
bootstrapConfiguration.writeTo(out);
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Response to a {@link BootstrapClusterRequest} indicating that the cluster has been successfully bootstrapped.
*/
public class BootstrapClusterResponse extends ActionResponse {
private final boolean alreadyBootstrapped;
public BootstrapClusterResponse(boolean alreadyBootstrapped) {
this.alreadyBootstrapped = alreadyBootstrapped;
}
public BootstrapClusterResponse(StreamInput in) throws IOException {
super(in);
alreadyBootstrapped = in.readBoolean();
}
/**
* @return whether this node already knew that the cluster had been bootstrapped when handling this request.
*/
public boolean getAlreadyBootstrapped() {
return alreadyBootstrapped;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(alreadyBootstrapped);
}
@Override
public String toString() {
return "BootstrapClusterResponse{" +
"alreadyBootstrapped=" + alreadyBootstrapped +
'}';
}
}

View File

@ -1,179 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class BootstrapConfiguration implements Writeable {
private final List<NodeDescription> nodeDescriptions;
public BootstrapConfiguration(List<NodeDescription> nodeDescriptions) {
if (nodeDescriptions.isEmpty()) {
throw new IllegalArgumentException("cannot create empty bootstrap configuration");
}
this.nodeDescriptions = Collections.unmodifiableList(new ArrayList<>(nodeDescriptions));
}
public BootstrapConfiguration(StreamInput in) throws IOException {
nodeDescriptions = Collections.unmodifiableList(in.readList(NodeDescription::new));
assert nodeDescriptions.isEmpty() == false;
}
public List<NodeDescription> getNodeDescriptions() {
return nodeDescriptions;
}
public VotingConfiguration resolve(Iterable<DiscoveryNode> discoveredNodes) {
final Set<DiscoveryNode> selectedNodes = new HashSet<>();
for (final NodeDescription nodeDescription : nodeDescriptions) {
final DiscoveryNode discoveredNode = nodeDescription.resolve(discoveredNodes);
if (selectedNodes.add(discoveredNode) == false) {
throw new ElasticsearchException("multiple nodes matching {} in {}", discoveredNode, this);
}
}
final Set<String> nodeIds = selectedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
assert nodeIds.size() == selectedNodes.size() : selectedNodes + " does not contain distinct IDs";
return new VotingConfiguration(nodeIds);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(nodeDescriptions);
}
@Override
public String toString() {
return "BootstrapConfiguration{" +
"nodeDescriptions=" + nodeDescriptions +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BootstrapConfiguration that = (BootstrapConfiguration) o;
return Objects.equals(nodeDescriptions, that.nodeDescriptions);
}
@Override
public int hashCode() {
return Objects.hash(nodeDescriptions);
}
public static class NodeDescription implements Writeable {
@Nullable
private final String id;
private final String name;
@Nullable
public String getId() {
return id;
}
public String getName() {
return name;
}
public NodeDescription(@Nullable String id, String name) {
this.id = id;
this.name = Objects.requireNonNull(name);
}
public NodeDescription(DiscoveryNode discoveryNode) {
this(discoveryNode.getId(), discoveryNode.getName());
}
public NodeDescription(StreamInput in) throws IOException {
this(in.readOptionalString(), in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(id);
out.writeString(name);
}
@Override
public String toString() {
return "NodeDescription{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
'}';
}
public DiscoveryNode resolve(Iterable<DiscoveryNode> discoveredNodes) {
DiscoveryNode selectedNode = null;
for (final DiscoveryNode discoveredNode : discoveredNodes) {
assert discoveredNode.isMasterNode() : discoveredNode;
if (discoveredNode.getName().equals(name)) {
if (id == null || id.equals(discoveredNode.getId())) {
if (selectedNode != null) {
throw new ElasticsearchException(
"discovered multiple nodes matching {} in {}", this, discoveredNodes);
}
selectedNode = discoveredNode;
} else {
throw new ElasticsearchException("node id mismatch comparing {} to {}", this, discoveredNode);
}
} else if (id != null && id.equals(discoveredNode.getId())) {
throw new ElasticsearchException("node name mismatch comparing {} to {}", this, discoveredNode);
}
}
if (selectedNode == null) {
throw new ElasticsearchException("no node matching {} found in {}", this, discoveredNodes);
}
return selectedNode;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeDescription that = (NodeDescription) o;
return Objects.equals(id, that.id) &&
Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(id, name);
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
public class GetDiscoveredNodesAction extends Action<GetDiscoveredNodesResponse> {
public static final GetDiscoveredNodesAction INSTANCE = new GetDiscoveredNodesAction();
public static final String NAME = "cluster:admin/bootstrap/discover_nodes";
private GetDiscoveredNodesAction() {
super(NAME);
}
@Override
public GetDiscoveredNodesResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Reader<GetDiscoveredNodesResponse> getResponseReader() {
return GetDiscoveredNodesResponse::new;
}
}

View File

@ -1,119 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Request the set of master-eligible nodes discovered by this node. Most useful in a brand-new cluster as a precursor to setting the
* initial configuration using {@link BootstrapClusterRequest}.
*/
public class GetDiscoveredNodesRequest extends ActionRequest {
@Nullable // if the request should wait indefinitely
private TimeValue timeout = TimeValue.timeValueSeconds(30);
private List<String> requiredNodes = Collections.emptyList();
public GetDiscoveredNodesRequest() {
}
public GetDiscoveredNodesRequest(StreamInput in) throws IOException {
super(in);
timeout = in.readOptionalTimeValue();
requiredNodes = in.readList(StreamInput::readString);
}
/**
* Sometimes it is useful to wait until enough nodes have been discovered, rather than failing immediately. This parameter controls how
* long to wait, and defaults to 30s.
*
* @param timeout how long to wait to discover sufficiently many nodes to respond successfully.
*/
public void setTimeout(@Nullable TimeValue timeout) {
if (timeout != null && timeout.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("negative timeout of [" + timeout + "] is not allowed");
}
this.timeout = timeout;
}
/**
* Sometimes it is useful to wait until enough nodes have been discovered, rather than failing immediately. This parameter controls how
* long to wait, and defaults to 30s.
*
* @return how long to wait to discover sufficiently many nodes to respond successfully.
*/
@Nullable
public TimeValue getTimeout() {
return timeout;
}
/**
* Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes.
* This parameter gives the names or transport addresses of the expected nodes.
*
* @return list of expected nodes
*/
public List<String> getRequiredNodes() {
return requiredNodes;
}
/**
* Sometimes it is useful only to receive a successful response after discovering a certain set of master-eligible nodes.
* This parameter gives the names or transport addresses of the expected nodes.
*
* @param requiredNodes list of expected nodes
*/
public void setRequiredNodes(final List<String> requiredNodes) {
this.requiredNodes = requiredNodes;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalTimeValue(timeout);
out.writeStringList(requiredNodes);
}
@Override
public String toString() {
return "GetDiscoveredNodesRequest{" +
"timeout=" + timeout +
", requiredNodes=" + requiredNodes + "}";
}
}

View File

@ -1,73 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Response to {@link GetDiscoveredNodesRequest}, containing the set of master-eligible nodes that were discovered.
*/
public class GetDiscoveredNodesResponse extends ActionResponse {
private final Set<DiscoveryNode> nodes;
public GetDiscoveredNodesResponse(Set<DiscoveryNode> nodes) {
this.nodes = Collections.unmodifiableSet(new HashSet<>(nodes));
}
public GetDiscoveredNodesResponse(StreamInput in) throws IOException {
super(in);
nodes = Collections.unmodifiableSet(in.readSet(DiscoveryNode::new));
}
/**
* @return the set of nodes that were discovered.
*/
public Set<DiscoveryNode> getNodes() {
return nodes;
}
/**
* @return a bootstrap configuration constructed from the set of nodes that were discovered, in order to make a
* {@link BootstrapClusterRequest}.
*/
public BootstrapConfiguration getBootstrapConfiguration() {
return new BootstrapConfiguration(nodes.stream().map(NodeDescription::new).collect(Collectors.toList()));
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeCollection(nodes);
}
}

View File

@ -1,87 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
public class TransportBootstrapClusterAction extends HandledTransportAction<BootstrapClusterRequest, BootstrapClusterResponse> {
@Nullable // TODO make this not nullable
private final Coordinator coordinator;
private final TransportService transportService;
private final String discoveryType;
@Inject
public TransportBootstrapClusterAction(Settings settings, ActionFilters actionFilters, TransportService transportService,
Discovery discovery) {
super(BootstrapClusterAction.NAME, transportService, actionFilters, BootstrapClusterRequest::new);
this.transportService = transportService;
this.discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
if (discovery instanceof Coordinator) {
coordinator = (Coordinator) discovery;
} else {
coordinator = null;
}
}
@Override
protected void doExecute(Task task, BootstrapClusterRequest request, ActionListener<BootstrapClusterResponse> listener) {
if (coordinator == null) { // TODO remove when not nullable
throw new IllegalArgumentException("cluster bootstrapping is not supported by discovery type [" + discoveryType + "]");
}
final DiscoveryNode localNode = transportService.getLocalNode();
assert localNode != null;
if (localNode.isMasterNode() == false) {
throw new IllegalArgumentException(
"this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node");
}
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void doRun() {
listener.onResponse(new BootstrapClusterResponse(
coordinator.setInitialConfiguration(request.getBootstrapConfiguration()) == false));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
@Override
public String toString() {
return "setting initial configuration with " + request;
}
});
}
}

View File

@ -1,178 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
public class TransportGetDiscoveredNodesAction extends HandledTransportAction<GetDiscoveredNodesRequest, GetDiscoveredNodesResponse> {
@Nullable // TODO make this not nullable
private final Coordinator coordinator;
private final TransportService transportService;
private final String discoveryType;
@Inject
public TransportGetDiscoveredNodesAction(Settings settings, ActionFilters actionFilters, TransportService transportService,
Discovery discovery) {
super(GetDiscoveredNodesAction.NAME, transportService, actionFilters,
(Reader<GetDiscoveredNodesRequest>) GetDiscoveredNodesRequest::new);
this.discoveryType = DISCOVERY_TYPE_SETTING.get(settings);
this.transportService = transportService;
if (discovery instanceof Coordinator) {
coordinator = (Coordinator) discovery;
} else {
coordinator = null;
}
}
@Override
protected void doExecute(Task task, GetDiscoveredNodesRequest request, ActionListener<GetDiscoveredNodesResponse> listener) {
if (coordinator == null) { // TODO remove when not nullable
throw new IllegalArgumentException("discovered nodes are not exposed by discovery type [" + discoveryType + "]");
}
final DiscoveryNode localNode = transportService.getLocalNode();
assert localNode != null;
if (localNode.isMasterNode() == false) {
throw new IllegalArgumentException(
"this node is not master-eligible, but discovered nodes are only exposed by master-eligible nodes");
}
final ExecutorService directExecutor = EsExecutors.newDirectExecutorService();
final AtomicBoolean listenerNotified = new AtomicBoolean();
final ListenableFuture<GetDiscoveredNodesResponse> listenableFuture = new ListenableFuture<>();
final ThreadPool threadPool = transportService.getThreadPool();
listenableFuture.addListener(listener, directExecutor, threadPool.getThreadContext());
// TODO make it so that listenableFuture copes with multiple completions, and then remove listenerNotified
final ActionListener<Iterable<DiscoveryNode>> respondIfRequestSatisfied = new ActionListener<Iterable<DiscoveryNode>>() {
@Override
public void onResponse(Iterable<DiscoveryNode> nodes) {
final Set<DiscoveryNode> nodesSet = new LinkedHashSet<>();
nodesSet.add(localNode);
nodes.forEach(nodesSet::add);
logger.trace("discovered {}", nodesSet);
try {
if (checkWaitRequirements(request, nodesSet)) {
final GetDiscoveredNodesResponse response = new GetDiscoveredNodesResponse(nodesSet);
if (listenerNotified.compareAndSet(false, true)) {
listenableFuture.onResponse(response);
}
}
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
if (listenerNotified.compareAndSet(false, true)) {
listenableFuture.onFailure(e);
}
}
@Override
public String toString() {
return "waiting for " + request;
}
};
final Releasable releasable = coordinator.withDiscoveryListener(respondIfRequestSatisfied);
listenableFuture.addListener(ActionListener.wrap(releasable::close), directExecutor, threadPool.getThreadContext());
if (coordinator.isInitialConfigurationSet()) {
respondIfRequestSatisfied.onFailure(new ClusterAlreadyBootstrappedException());
} else {
respondIfRequestSatisfied.onResponse(coordinator.getFoundPeers());
}
if (request.getTimeout() != null) {
threadPool.schedule(request.getTimeout(), Names.SAME, new Runnable() {
@Override
public void run() {
respondIfRequestSatisfied.onFailure(new ElasticsearchTimeoutException("timed out while waiting for " + request));
}
@Override
public String toString() {
return "timeout handler for " + request;
}
});
}
}
private static boolean matchesRequirement(DiscoveryNode discoveryNode, String requirement) {
return discoveryNode.getName().equals(requirement)
|| discoveryNode.getAddress().toString().equals(requirement)
|| discoveryNode.getAddress().getAddress().equals(requirement);
}
private static boolean checkWaitRequirements(GetDiscoveredNodesRequest request, Set<DiscoveryNode> nodes) {
List<String> requirements = request.getRequiredNodes();
final Set<DiscoveryNode> selectedNodes = new HashSet<>();
for (final String requirement : requirements) {
final Set<DiscoveryNode> matchingNodes
= nodes.stream().filter(n -> matchesRequirement(n, requirement)).collect(Collectors.toSet());
if (matchingNodes.isEmpty()) {
return false;
}
if (matchingNodes.size() > 1) {
throw new IllegalArgumentException("[" + requirement + "] matches " + matchingNodes);
}
for (final DiscoveryNode matchingNode : matchingNodes) {
if (selectedNodes.add(matchingNode) == false) {
throw new IllegalArgumentException("[" + matchingNode + "] matches " +
requirements.stream().filter(r -> matchesRequirement(matchingNode, requirement)).collect(Collectors.toList()));
}
}
}
return true;
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/**
* Exception thrown if trying to discovery nodes in order to perform cluster bootstrapping, but a cluster is formed before all the required
* nodes are discovered.
*/
public class ClusterAlreadyBootstrappedException extends ElasticsearchException {
public ClusterAlreadyBootstrappedException() {
super("node has already joined a bootstrapped cluster, bootstrapping is not required");
}
public ClusterAlreadyBootstrappedException(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -21,56 +21,73 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
public class ClusterBootstrapService {
private static final Logger logger = LogManager.getLogger(ClusterBootstrapService.class);
public static final Setting<List<String>> INITIAL_MASTER_NODES_SETTING =
Setting.listSetting("cluster.initial_master_nodes", Collections.emptyList(), Function.identity(), Property.NodeScope);
Setting.listSetting("cluster.initial_master_nodes", emptyList(), Function.identity(), Property.NodeScope);
public static final Setting<TimeValue> UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING =
Setting.timeSetting("discovery.unconfigured_bootstrap_timeout",
TimeValue.timeValueSeconds(3), TimeValue.timeValueMillis(1), Property.NodeScope);
private final List<String> initialMasterNodes;
@Nullable
static final String BOOTSTRAP_PLACEHOLDER_PREFIX = "{bootstrap-placeholder}-";
private static final Logger logger = LogManager.getLogger(ClusterBootstrapService.class);
private final Set<String> bootstrapRequirements;
@Nullable // null if discoveryIsConfigured()
private final TimeValue unconfiguredBootstrapTimeout;
private final TransportService transportService;
private volatile boolean running;
private final Supplier<Iterable<DiscoveryNode>> discoveredNodesSupplier;
private final BooleanSupplier isBootstrappedSupplier;
private final Consumer<VotingConfiguration> votingConfigurationConsumer;
private final AtomicBoolean bootstrappingPermitted = new AtomicBoolean(true);
public ClusterBootstrapService(Settings settings, TransportService transportService,
Supplier<Iterable<DiscoveryNode>> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier,
Consumer<VotingConfiguration> votingConfigurationConsumer) {
final List<String> initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
bootstrapRequirements = unmodifiableSet(new LinkedHashSet<>(initialMasterNodes));
if (bootstrapRequirements.size() != initialMasterNodes.size()) {
throw new IllegalArgumentException(
"setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] contains duplicates: " + initialMasterNodes);
}
public ClusterBootstrapService(Settings settings, TransportService transportService) {
initialMasterNodes = INITIAL_MASTER_NODES_SETTING.get(settings);
unconfiguredBootstrapTimeout = discoveryIsConfigured(settings) ? null : UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
this.discoveredNodesSupplier = discoveredNodesSupplier;
this.isBootstrappedSupplier = isBootstrappedSupplier;
this.votingConfigurationConsumer = votingConfigurationConsumer;
}
public static boolean discoveryIsConfigured(Settings settings) {
@ -78,157 +95,135 @@ public class ClusterBootstrapService {
.anyMatch(s -> s.exists(settings));
}
public void start() {
assert running == false;
running = true;
void onFoundPeersUpdated() {
final Set<DiscoveryNode> nodes = getDiscoveredNodes();
if (transportService.getLocalNode().isMasterNode() && bootstrapRequirements.isEmpty() == false
&& isBootstrappedSupplier.getAsBoolean() == false && nodes.stream().noneMatch(Coordinator::isZen1Node)) {
final Tuple<Set<DiscoveryNode>,List<String>> requirementMatchingResult;
try {
requirementMatchingResult = checkRequirements(nodes);
} catch (IllegalStateException e) {
logger.warn("bootstrapping cancelled", e);
bootstrappingPermitted.set(false);
return;
}
final Set<DiscoveryNode> nodesMatchingRequirements = requirementMatchingResult.v1();
final List<String> unsatisfiedRequirements = requirementMatchingResult.v2();
logger.trace("nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}",
nodesMatchingRequirements, unsatisfiedRequirements, bootstrapRequirements);
if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) {
startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements);
}
}
}
void scheduleUnconfiguredBootstrap() {
if (unconfiguredBootstrapTimeout == null) {
return;
}
if (transportService.getLocalNode().isMasterNode() == false) {
return;
}
if (unconfiguredBootstrapTimeout != null) {
logger.info("no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] " +
"unless existing master is discovered", unconfiguredBootstrapTimeout);
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
logger.info("no discovery configuration found, will perform best-effort cluster bootstrapping after [{}] " +
"unless existing master is discovered", unconfiguredBootstrapTimeout);
transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.SAME, new Runnable() {
transportService.getThreadPool().scheduleUnlessShuttingDown(unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() {
@Override
public void run() {
final Set<DiscoveryNode> discoveredNodes = getDiscoveredNodes();
final List<DiscoveryNode> zen1Nodes = discoveredNodes.stream().filter(Coordinator::isZen1Node).collect(Collectors.toList());
if (zen1Nodes.isEmpty()) {
logger.debug("performing best-effort cluster bootstrapping with {}", discoveredNodes);
startBootstrap(discoveredNodes, emptyList());
} else {
logger.info("avoiding best-effort cluster bootstrapping due to discovery of pre-7.0 nodes {}", zen1Nodes);
}
}
@Override
public String toString() {
return "unconfigured-discovery delayed bootstrap";
}
});
}
private Set<DiscoveryNode> getDiscoveredNodes() {
return Stream.concat(Stream.of(transportService.getLocalNode()),
StreamSupport.stream(discoveredNodesSupplier.get().spliterator(), false)).collect(Collectors.toSet());
}
private void startBootstrap(Set<DiscoveryNode> discoveryNodes, List<String> unsatisfiedRequirements) {
assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode) : discoveryNodes;
assert discoveryNodes.stream().noneMatch(Coordinator::isZen1Node) : discoveryNodes;
assert unsatisfiedRequirements.size() < discoveryNodes.size() : discoveryNodes + " smaller than " + unsatisfiedRequirements;
if (bootstrappingPermitted.compareAndSet(true, false)) {
doBootstrap(new VotingConfiguration(Stream.concat(discoveryNodes.stream().map(DiscoveryNode::getId),
unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s))
.collect(Collectors.toSet())));
}
}
public static boolean isBootstrapPlaceholder(String nodeId) {
return nodeId.startsWith(BOOTSTRAP_PLACEHOLDER_PREFIX);
}
private void doBootstrap(VotingConfiguration votingConfiguration) {
assert transportService.getLocalNode().isMasterNode();
try {
votingConfigurationConsumer.accept(votingConfiguration);
} catch (Exception e) {
logger.warn(new ParameterizedMessage("exception when bootstrapping with {}, rescheduling", votingConfiguration), e);
transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.GENERIC,
new Runnable() {
@Override
public void run() {
// TODO: remove the following line once schedule method properly preserves thread context
threadContext.markAsSystemContext();
final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
new TransportResponseHandler<GetDiscoveredNodesResponse>() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
logger.debug("discovered {}, starting to bootstrap", response.getNodes());
awaitBootstrap(response.getBootstrapConfiguration());
}
@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
if (rootCause instanceof ClusterAlreadyBootstrappedException) {
logger.debug(rootCause.getMessage(), rootCause);
} else {
logger.warn("discovery attempt failed", exp);
}
}
@Override
public String executor() {
return Names.SAME;
}
@Override
public GetDiscoveredNodesResponse read(StreamInput in) throws IOException {
return new GetDiscoveredNodesResponse(in);
}
});
doBootstrap(votingConfiguration);
}
@Override
public String toString() {
return "unconfigured-discovery delayed bootstrap";
return "retry of failed bootstrapping with " + votingConfiguration;
}
});
}
} else if (initialMasterNodes.isEmpty() == false) {
logger.debug("waiting for discovery of master-eligible nodes matching {}", initialMasterNodes);
final ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
final GetDiscoveredNodesRequest request = new GetDiscoveredNodesRequest();
request.setRequiredNodes(initialMasterNodes);
request.setTimeout(null);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), GetDiscoveredNodesAction.NAME, request,
new TransportResponseHandler<GetDiscoveredNodesResponse>() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
assert response.getNodes().stream().allMatch(DiscoveryNode::isMasterNode);
logger.debug("discovered {}, starting to bootstrap", response.getNodes());
awaitBootstrap(response.getBootstrapConfiguration());
}
@Override
public void handleException(TransportException exp) {
logger.warn("discovery attempt failed", exp);
}
@Override
public String executor() {
return Names.SAME;
}
@Override
public GetDiscoveredNodesResponse read(StreamInput in) throws IOException {
return new GetDiscoveredNodesResponse(in);
}
});
}
}
);
}
}
public void stop() {
running = false;
private static boolean matchesRequirement(DiscoveryNode discoveryNode, String requirement) {
return discoveryNode.getName().equals(requirement)
|| discoveryNode.getAddress().toString().equals(requirement)
|| discoveryNode.getAddress().getAddress().equals(requirement);
}
private void awaitBootstrap(final BootstrapConfiguration bootstrapConfiguration) {
if (running == false) {
logger.debug("awaitBootstrap: not running");
return;
private Tuple<Set<DiscoveryNode>,List<String>> checkRequirements(Set<DiscoveryNode> nodes) {
final Set<DiscoveryNode> selectedNodes = new HashSet<>();
final List<String> unmatchedRequirements = new ArrayList<>();
for (final String bootstrapRequirement : bootstrapRequirements) {
final Set<DiscoveryNode> matchingNodes
= nodes.stream().filter(n -> matchesRequirement(n, bootstrapRequirement)).collect(Collectors.toSet());
if (matchingNodes.size() == 0) {
unmatchedRequirements.add(bootstrapRequirement);
}
if (matchingNodes.size() > 1) {
throw new IllegalStateException("requirement [" + bootstrapRequirement + "] matches multiple nodes: " + matchingNodes);
}
for (final DiscoveryNode matchingNode : matchingNodes) {
if (selectedNodes.add(matchingNode) == false) {
throw new IllegalStateException("node [" + matchingNode + "] matches multiple requirements: " +
bootstrapRequirements.stream().filter(r -> matchesRequirement(matchingNode, r)).collect(Collectors.toList()));
}
}
}
BootstrapClusterRequest request = new BootstrapClusterRequest(bootstrapConfiguration);
logger.trace("sending {}", request);
transportService.sendRequest(transportService.getLocalNode(), BootstrapClusterAction.NAME, request,
new TransportResponseHandler<BootstrapClusterResponse>() {
@Override
public void handleResponse(BootstrapClusterResponse response) {
logger.debug("automatic cluster bootstrapping successful: received {}", response);
}
@Override
public void handleException(TransportException exp) {
// log a warning since a failure here indicates a bad problem, such as:
// - bootstrap configuration resolution failed (e.g. discovered nodes no longer match those in the bootstrap config)
// - discovered nodes no longer form a quorum in the bootstrap config
logger.warn(new ParameterizedMessage("automatic cluster bootstrapping failed, retrying [{}]",
bootstrapConfiguration.getNodeDescriptions()), exp);
// There's not really much else we can do apart from retry and hope that the problem goes away. The retry is delayed
// since a tight loop here is unlikely to help.
transportService.getThreadPool().scheduleUnlessShuttingDown(TimeValue.timeValueSeconds(10), Names.SAME, new Runnable() {
@Override
public void run() {
// TODO: remove the following line once schedule method properly preserves thread context
transportService.getThreadPool().getThreadContext().markAsSystemContext();
awaitBootstrap(bootstrapConfiguration);
}
@Override
public String toString() {
return "retry bootstrapping with " + bootstrapConfiguration.getNodeDescriptions();
}
});
}
@Override
public String executor() {
return Names.SAME;
}
@Override
public BootstrapClusterResponse read(StreamInput in) throws IOException {
return new BootstrapClusterResponse(in);
}
});
return Tuple.tuple(selectedNodes, unmatchedRequirements);
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
@ -188,13 +189,22 @@ public class ClusterFormationFailureHelper {
private String describeQuorum(VotingConfiguration votingConfiguration) {
final Set<String> nodeIds = votingConfiguration.getNodeIds();
assert nodeIds.isEmpty() == false;
final int requiredNodes = nodeIds.size() / 2 + 1;
final Set<String> realNodeIds = new HashSet<>(nodeIds);
realNodeIds.removeIf(ClusterBootstrapService::isBootstrapPlaceholder);
assert requiredNodes <= realNodeIds.size() : nodeIds;
if (nodeIds.size() == 1) {
return "a node with id " + nodeIds;
return "a node with id " + realNodeIds;
} else if (nodeIds.size() == 2) {
return "two nodes with ids " + nodeIds;
return "two nodes with ids " + realNodeIds;
} else {
return "at least " + (nodeIds.size() / 2 + 1) + " nodes with ids from " + nodeIds;
if (requiredNodes < realNodeIds.size()) {
return "at least " + requiredNodes + " nodes with ids from " + realNodeIds;
} else {
return requiredNodes + " nodes with ids " + realNodeIds;
}
}
}
}

View File

@ -24,7 +24,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -83,7 +82,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_ID;
import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@ -139,8 +137,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
private final Set<ActionListener<Iterable<DiscoveryNode>>> discoveredNodesListeners = newConcurrentSet();
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, UnicastHostsProvider unicastHostsProvider,
@ -170,7 +166,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers,
this::isInitialConfigurationSet, this::setInitialConfiguration);
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
@ -296,12 +293,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
becomeFollower("handlePublishRequest", sourceNode); // also updates preVoteCollector
}
if (isInitialConfigurationSet()) {
for (final ActionListener<Iterable<DiscoveryNode>> discoveredNodesListener : discoveredNodesListeners) {
discoveredNodesListener.onFailure(new ClusterAlreadyBootstrappedException());
}
}
return new PublishWithJoinResponse(publishResponse,
joinWithDestination(lastJoin, sourceNode, publishRequest.getAcceptedState().term()));
}
@ -598,16 +589,12 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
synchronized (mutex) {
becomeCandidate("startInitialJoin");
}
if (isInitialConfigurationSet() == false) {
clusterBootstrapService.start();
}
clusterBootstrapService.scheduleUnconfiguredBootstrap();
}
@Override
protected void doStop() {
configuredHostsResolver.stop();
clusterBootstrapService.stop();
}
@Override
@ -706,21 +693,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
return getStateForMasterService().getLastAcceptedConfiguration().isEmpty() == false;
}
/**
* Sets the initial configuration by resolving the given {@link BootstrapConfiguration} to concrete nodes. This method is safe to call
* more than once, as long as each call's bootstrap configuration resolves to the same set of nodes.
*
* @param bootstrapConfiguration A description of the nodes that should form the initial configuration.
* @return whether this call successfully set the initial configuration - if false, the cluster has already been bootstrapped.
*/
public boolean setInitialConfiguration(final BootstrapConfiguration bootstrapConfiguration) {
final List<DiscoveryNode> selfAndDiscoveredPeers = new ArrayList<>();
selfAndDiscoveredPeers.add(getLocalNode());
getFoundPeers().forEach(selfAndDiscoveredPeers::add);
final VotingConfiguration votingConfiguration = bootstrapConfiguration.resolve(selfAndDiscoveredPeers);
return setInitialConfiguration(votingConfiguration);
}
/**
* Sets the initial configuration to the given {@link VotingConfiguration}. This method is safe to call
* more than once, as long as the argument to each call is the same.
@ -733,13 +705,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
final ClusterState currentState = getStateForMasterService();
if (isInitialConfigurationSet()) {
logger.debug("initial configuration already set, ignoring {}", votingConfiguration);
return false;
}
if (mode != Mode.CANDIDATE) {
throw new CoordinationStateRejectedException("Cannot set initial configuration in mode " + mode);
}
final List<DiscoveryNode> knownNodes = new ArrayList<>();
knownNodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(knownNodes::add);
@ -899,8 +868,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {
try {
synchronized (mutex) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (mode != Mode.LEADER) {
logger.debug(() -> new ParameterizedMessage("[{}] failed publication as not currently leading",
clusterChangedEvent.source()));
@ -1019,9 +986,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
protected void onFoundPeersUpdated() {
final Iterable<DiscoveryNode> foundPeers;
synchronized (mutex) {
foundPeers = getFoundPeers();
final Iterable<DiscoveryNode> foundPeers = getFoundPeers();
if (mode == Mode.CANDIDATE) {
final CoordinationState.VoteCollection expectedVotes = new CoordinationState.VoteCollection();
foundPeers.forEach(expectedVotes::addVote);
@ -1039,9 +1005,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
for (final ActionListener<Iterable<DiscoveryNode>> discoveredNodesListener : discoveredNodesListeners) {
discoveredNodesListener.onResponse(foundPeers);
}
clusterBootstrapService.onFoundPeersUpdated();
}
}
@ -1076,14 +1040,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
});
}
public Releasable withDiscoveryListener(ActionListener<Iterable<DiscoveryNode>> listener) {
discoveredNodesListeners.add(listener);
return () -> {
boolean removed = discoveredNodesListeners.remove(listener);
assert removed : listener;
};
}
public Iterable<DiscoveryNode> getFoundPeers() {
// TODO everyone takes this and adds the local node. Maybe just add the local node here?
return peerFinder.getFoundPeers();

View File

@ -32,7 +32,6 @@ import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.client.AbstractClientHeadersTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException;
@ -809,8 +808,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(148, UnknownNamedObjectException.class);
ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class);
ids.put(150, CoordinationStateRejectedException.class);
ids.put(151, ClusterAlreadyBootstrappedException.class);
ids.put(152, SnapshotInProgressException.class);
ids.put(151, SnapshotInProgressException.class);
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class BootstrapClusterRequestTests extends ESTestCase {
public void testSerialization() throws IOException {
final BootstrapConfiguration bootstrapConfiguration
= new BootstrapConfiguration(Collections.singletonList(new NodeDescription(null, randomAlphaOfLength(10))));
final BootstrapClusterRequest original = new BootstrapClusterRequest(bootstrapConfiguration);
assertNull(original.validate());
final BootstrapClusterRequest deserialized = copyWriteable(original, writableRegistry(), BootstrapClusterRequest::new);
assertThat(deserialized.getBootstrapConfiguration(), equalTo(bootstrapConfiguration));
}
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class BootstrapClusterResponseTests extends ESTestCase {
public void testSerialization() throws IOException {
final BootstrapClusterResponse original = new BootstrapClusterResponse(randomBoolean());
final BootstrapClusterResponse deserialized = copyWriteable(original, writableRegistry(), BootstrapClusterResponse::new);
assertThat(deserialized.getAlreadyBootstrapped(), equalTo(original.getAlreadyBootstrapped()));
}
}

View File

@ -1,182 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.test.EqualsHashCodeTestUtils.CopyFunction;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
public class BootstrapConfigurationTests extends ESTestCase {
public void testEqualsHashcodeSerialization() {
// Note: the explicit cast of the CopyFunction is needed for some IDE (specifically Eclipse 4.8.0) to infer the right type
EqualsHashCodeTestUtils.checkEqualsAndHashCode(randomBootstrapConfiguration(),
(CopyFunction<BootstrapConfiguration>) bootstrapConfiguration -> copyWriteable(bootstrapConfiguration, writableRegistry(),
BootstrapConfiguration::new),
this::mutate);
}
public void testNodeDescriptionResolvedByName() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final DiscoveryNode expectedNode = randomFrom(discoveryNodes);
assertThat(new NodeDescription(null, expectedNode.getName()).resolve(discoveryNodes), equalTo(expectedNode));
}
public void testNodeDescriptionResolvedByIdAndName() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final DiscoveryNode expectedNode = randomFrom(discoveryNodes);
assertThat(new NodeDescription(expectedNode).resolve(discoveryNodes), equalTo(expectedNode));
}
public void testRejectsMismatchedId() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final DiscoveryNode expectedNode = randomFrom(discoveryNodes);
final ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new NodeDescription(randomAlphaOfLength(11), expectedNode.getName()).resolve(discoveryNodes));
assertThat(e.getMessage(), startsWith("node id mismatch comparing "));
}
public void testRejectsMismatchedName() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final DiscoveryNode expectedNode = randomFrom(discoveryNodes);
final ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new NodeDescription(expectedNode.getId(), randomAlphaOfLength(11)).resolve(discoveryNodes));
assertThat(e.getMessage(), startsWith("node name mismatch comparing "));
}
public void testFailsIfNoMatch() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> randomNodeDescription().resolve(discoveryNodes));
assertThat(e.getMessage(), startsWith("no node matching "));
}
public void testFailsIfDuplicateMatchOnName() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final DiscoveryNode discoveryNode = randomFrom(discoveryNodes);
discoveryNodes.add(new DiscoveryNode(discoveryNode.getName(), randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(),
singleton(Role.MASTER), Version.CURRENT));
final ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new NodeDescription(null, discoveryNode.getName()).resolve(discoveryNodes));
assertThat(e.getMessage(), startsWith("discovered multiple nodes matching "));
}
public void testFailsIfDuplicatedNode() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final DiscoveryNode discoveryNode = randomFrom(discoveryNodes);
discoveryNodes.add(discoveryNode);
final ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> new NodeDescription(discoveryNode).resolve(discoveryNodes));
assertThat(e.getMessage(), startsWith("discovered multiple nodes matching "));
}
public void testResolvesEntireConfiguration() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final List<DiscoveryNode> selectedNodes = randomSubsetOf(randomIntBetween(1, discoveryNodes.size()), discoveryNodes);
final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(selectedNodes.stream()
.map(discoveryNode -> randomBoolean() ? new NodeDescription(discoveryNode) : new NodeDescription(null, discoveryNode.getName()))
.collect(Collectors.toList()));
final VotingConfiguration expectedConfiguration
= new VotingConfiguration(selectedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()));
final VotingConfiguration votingConfiguration = bootstrapConfiguration.resolve(discoveryNodes);
assertThat(votingConfiguration, equalTo(expectedConfiguration));
}
public void testRejectsDuplicatedDescriptions() {
final List<DiscoveryNode> discoveryNodes = randomDiscoveryNodes();
final List<DiscoveryNode> selectedNodes = randomSubsetOf(randomIntBetween(1, discoveryNodes.size()), discoveryNodes);
final List<NodeDescription> selectedNodeDescriptions = selectedNodes.stream()
.map(discoveryNode -> randomBoolean() ? new NodeDescription(discoveryNode) : new NodeDescription(null, discoveryNode.getName()))
.collect(Collectors.toList());
final NodeDescription toDuplicate = randomFrom(selectedNodeDescriptions);
selectedNodeDescriptions.add(randomBoolean() ? toDuplicate : new NodeDescription(null, toDuplicate.getName()));
final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(selectedNodeDescriptions);
final ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> bootstrapConfiguration.resolve(discoveryNodes));
assertThat(e.getMessage(), startsWith("multiple nodes matching "));
}
private NodeDescription mutate(NodeDescription original) {
if (randomBoolean()) {
return new NodeDescription(original.getId(), randomAlphaOfLength(21 - original.getName().length()));
} else {
if (original.getId() == null) {
return new NodeDescription(randomAlphaOfLength(10), original.getName());
} else if (randomBoolean()) {
return new NodeDescription(randomAlphaOfLength(21 - original.getId().length()), original.getName());
} else {
return new NodeDescription(null, original.getName());
}
}
}
protected BootstrapConfiguration mutate(BootstrapConfiguration original) {
final List<NodeDescription> newDescriptions = new ArrayList<>(original.getNodeDescriptions());
final int mutateElement = randomIntBetween(0, newDescriptions.size());
if (mutateElement == newDescriptions.size()) {
newDescriptions.add(randomIntBetween(0, newDescriptions.size()), randomNodeDescription());
} else {
if (newDescriptions.size() > 1 && randomBoolean()) {
newDescriptions.remove(mutateElement);
} else {
newDescriptions.set(mutateElement, mutate(newDescriptions.get(mutateElement)));
}
}
return new BootstrapConfiguration(newDescriptions);
}
protected NodeDescription randomNodeDescription() {
return new NodeDescription(randomBoolean() ? null : randomAlphaOfLength(10), randomAlphaOfLength(10));
}
protected BootstrapConfiguration randomBootstrapConfiguration() {
final int size = randomIntBetween(1, 5);
final List<NodeDescription> nodeDescriptions = new ArrayList<>(size);
while (nodeDescriptions.size() <= size) {
nodeDescriptions.add(randomNodeDescription());
}
return new BootstrapConfiguration(nodeDescriptions);
}
protected List<DiscoveryNode> randomDiscoveryNodes() {
final int size = randomIntBetween(1, 5);
final List<DiscoveryNode> nodes = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
nodes.add(new DiscoveryNode(randomAlphaOfLength(10), randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(),
singleton(Role.MASTER), Version.CURRENT));
}
return nodes;
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.Is.is;
public class GetDiscoveredNodesRequestTests extends ESTestCase {
public void testTimeoutValidation() {
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
assertThat("default value is 30s", getDiscoveredNodesRequest.getTimeout(), is(TimeValue.timeValueSeconds(30)));
final TimeValue newTimeout = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
getDiscoveredNodesRequest.setTimeout(newTimeout);
assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), equalTo(newTimeout));
final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> getDiscoveredNodesRequest.setTimeout(TimeValue.timeValueNanos(randomLongBetween(-10, -1))));
assertThat(exception.getMessage(), startsWith("negative timeout of "));
assertThat(exception.getMessage(), endsWith(" is not allowed"));
getDiscoveredNodesRequest.setTimeout(null);
assertThat("value updated", getDiscoveredNodesRequest.getTimeout(), nullValue());
}
public void testSerialization() throws IOException {
final GetDiscoveredNodesRequest originalRequest = new GetDiscoveredNodesRequest();
if (randomBoolean()) {
originalRequest.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
} else if (randomBoolean()) {
originalRequest.setTimeout(null);
}
final GetDiscoveredNodesRequest deserialized = copyWriteable(originalRequest, writableRegistry(), GetDiscoveredNodesRequest::new);
assertThat(deserialized.getTimeout(), equalTo(originalRequest.getTimeout()));
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.equalTo;
public class GetDiscoveredNodesResponseTests extends ESTestCase {
public void testSerialization() throws IOException {
final GetDiscoveredNodesResponse original = new GetDiscoveredNodesResponse(randomDiscoveryNodeSet());
final GetDiscoveredNodesResponse deserialized = copyWriteable(original, writableRegistry(), GetDiscoveredNodesResponse::new);
assertThat(deserialized.getNodes(), equalTo(original.getNodes()));
}
private Set<DiscoveryNode> randomDiscoveryNodeSet() {
final int size = randomIntBetween(1, 10);
final Set<DiscoveryNode> nodes = new HashSet<>(size);
while (nodes.size() < size) {
assertTrue(nodes.add(new DiscoveryNode(randomAlphaOfLength(10), randomAlphaOfLength(10),
UUIDs.randomBase64UUID(random()), randomAlphaOfLength(10), randomAlphaOfLength(10), buildNewFakeTransportAddress(),
emptyMap(), singleton(Role.MASTER), Version.CURRENT)));
}
return nodes;
}
public void testConversionToBootstrapConfiguration() {
final Set<DiscoveryNode> nodes = randomDiscoveryNodeSet();
assertThat(new GetDiscoveredNodesResponse(nodes).getBootstrapConfiguration().resolve(nodes).getNodeIds(),
equalTo(nodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())));
}
}

View File

@ -1,233 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.NoOpClusterApplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
public class TransportBootstrapClusterActionTests extends ESTestCase {
private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());
private DiscoveryNode discoveryNode;
private static ThreadPool threadPool;
private TransportService transportService;
private Coordinator coordinator;
private static BootstrapClusterRequest exampleRequest() {
return new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription("id", "name"))));
}
@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool("test", Settings.EMPTY);
}
@AfterClass
public static void shutdownThreadPool() {
threadPool.shutdown();
}
@Before
public void setupTest() {
discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT);
final MockTransport transport = new MockTransport();
transportService = transport.createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> discoveryNode, null, emptySet());
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
coordinator = new Coordinator("local", Settings.EMPTY, clusterSettings, transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(Settings.EMPTY),
new MasterService("local", Settings.EMPTY, threadPool),
() -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName("cluster")).build()), r -> emptyList(),
new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong()));
}
public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException {
final Discovery discovery = mock(Discovery.class);
verifyZeroInteractions(discovery);
final String nonstandardDiscoveryType = randomFrom(DiscoveryModule.ZEN_DISCOVERY_TYPE, "single-node", "unknown");
new TransportBootstrapClusterAction(
Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), nonstandardDiscoveryType).build(),
EMPTY_FILTERS, transportService, discovery); // registers action
transportService.start();
transportService.acceptIncomingRequests();
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() {
@Override
public void handleResponse(BootstrapClusterResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(IllegalArgumentException.class));
assertThat(rootCause.getMessage(), equalTo("cluster bootstrapping is not supported by discovery type [" +
nonstandardDiscoveryType + "]"));
countDownLatch.countDown();
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
public void testFailsOnNonMasterEligibleNodes() throws InterruptedException {
discoveryNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
// transport service only picks up local node when started, so we can change it here ^
new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, exampleRequest(), new ResponseHandler() {
@Override
public void handleResponse(BootstrapClusterResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(IllegalArgumentException.class));
assertThat(rootCause.getMessage(),
equalTo("this node is not master-eligible, but cluster bootstrapping can only happen on a master-eligible node"));
countDownLatch.countDown();
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
public void testSetsInitialConfiguration() throws InterruptedException {
new TransportBootstrapClusterAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
assertFalse(coordinator.isInitialConfigurationSet());
final BootstrapClusterRequest request
= new BootstrapClusterRequest(new BootstrapConfiguration(singletonList(new NodeDescription(discoveryNode))));
{
final int parallelRequests = 10;
final CountDownLatch countDownLatch = new CountDownLatch(parallelRequests);
final AtomicInteger successes = new AtomicInteger();
for (int i = 0; i < parallelRequests; i++) {
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() {
@Override
public void handleResponse(BootstrapClusterResponse response) {
if (response.getAlreadyBootstrapped() == false) {
successes.incrementAndGet();
}
countDownLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
}
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
assertThat(successes.get(), equalTo(1));
}
assertTrue(coordinator.isInitialConfigurationSet());
{
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(discoveryNode, BootstrapClusterAction.NAME, request, new ResponseHandler() {
@Override
public void handleResponse(BootstrapClusterResponse response) {
assertTrue(response.getAlreadyBootstrapped());
countDownLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
}
private abstract class ResponseHandler implements TransportResponseHandler<BootstrapClusterResponse> {
@Override
public String executor() {
return Names.SAME;
}
@Override
public BootstrapClusterResponse read(StreamInput in) throws IOException {
return new BootstrapClusterResponse(in);
}
}
}

View File

@ -1,533 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.bootstrap;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.ClusterAlreadyBootstrappedException;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.NoOpClusterApplier;
import org.elasticsearch.cluster.coordination.PeersResponse;
import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
import org.elasticsearch.cluster.coordination.PublishWithJoinResponse;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.PeersRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportService.HandshakeResponse;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
public class TransportGetDiscoveredNodesActionTests extends ESTestCase {
private static final ActionFilters EMPTY_FILTERS = new ActionFilters(emptySet());
private static ThreadPool threadPool;
private DiscoveryNode localNode;
private String clusterName;
private TransportService transportService;
private Coordinator coordinator;
private DiscoveryNode otherNode;
@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool("test", Settings.EMPTY);
}
@AfterClass
public static void shutdownThreadPool() {
threadPool.shutdown();
}
@Before
public void setupTest() {
clusterName = randomAlphaOfLength(10);
localNode = new DiscoveryNode(
"node1", "local", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
otherNode = new DiscoveryNode(
"node2", "other", buildNewFakeTransportAddress(), emptyMap(), EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
final MockTransport transport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME) && node.getAddress().equals(otherNode.getAddress())) {
handleResponse(requestId, new HandshakeResponse(otherNode, new ClusterName(clusterName), Version.CURRENT));
}
}
};
transportService = transport.createTransportService(
Settings.builder().put(CLUSTER_NAME_SETTING.getKey(), clusterName).build(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
final Settings settings = Settings.builder()
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
coordinator = new Coordinator("local", settings, clusterSettings, transportService, writableRegistry(),
ESAllocationTestCase.createAllocationService(settings),
new MasterService("local", settings, threadPool),
() -> new InMemoryPersistedState(0, ClusterState.builder(new ClusterName(clusterName)).build()), r -> emptyList(),
new NoOpClusterApplier(), Collections.emptyList(), new Random(random().nextLong()));
}
public void testHandlesNonstandardDiscoveryImplementation() throws InterruptedException {
final Discovery discovery = mock(Discovery.class);
verifyZeroInteractions(discovery);
final String nonstandardDiscoveryType = randomFrom(DiscoveryModule.ZEN_DISCOVERY_TYPE, "single-node", "unknown");
new TransportGetDiscoveredNodesAction(
Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), nonstandardDiscoveryType).build(),
EMPTY_FILTERS, transportService, discovery); // registers action
transportService.start();
transportService.acceptIncomingRequests();
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, new GetDiscoveredNodesRequest(), new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(IllegalArgumentException.class));
assertThat(rootCause.getMessage(), equalTo("discovered nodes are not exposed by discovery type [" +
nonstandardDiscoveryType + "]"));
countDownLatch.countDown();
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
public void testFailsOnMasterIneligibleNodes() throws InterruptedException {
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
// transport service only picks up local node when started, so we can change it here ^
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, new GetDiscoveredNodesRequest(), new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(IllegalArgumentException.class));
assertThat(rootCause.getMessage(),
equalTo("this node is not master-eligible, but discovered nodes are only exposed by master-eligible nodes"));
countDownLatch.countDown();
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
public void testFailsQuicklyWithZeroTimeoutAndAcceptsNullTimeout() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setTimeout(null);
getDiscoveredNodesRequest.setRequiredNodes(singletonList("not-a-node"));
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
}
{
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
getDiscoveredNodesRequest.setRequiredNodes(singletonList("not-a-node"));
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
final Throwable rootCause = exp.getRootCause();
assertThat(rootCause, instanceOf(ElasticsearchTimeoutException.class));
assertThat(rootCause.getMessage(), startsWith("timed out while waiting for GetDiscoveredNodesRequest{"));
countDownLatch.countDown();
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
}
public void testFailsIfAlreadyBootstrapped() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
coordinator.setInitialConfiguration(new VotingConfiguration(singleton(localNode.getId())));
final CountDownLatch countDownLatch = new CountDownLatch(1);
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setTimeout(null);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) {
countDownLatch.countDown();
} else {
throw new AssertionError("should not be called", exp);
}
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
public void testFailsIfAcceptsClusterStateWithNonemptyConfiguration() throws InterruptedException, IOException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setTimeout(null);
getDiscoveredNodesRequest.setRequiredNodes(singletonList("not-a-node"));
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof ClusterAlreadyBootstrappedException) {
countDownLatch.countDown();
} else {
throw new AssertionError("should not be called", exp);
}
}
});
ClusterState.Builder publishedClusterState = ClusterState.builder(ClusterName.DEFAULT);
publishedClusterState.incrementVersion();
publishedClusterState.nodes(DiscoveryNodes.builder()
.add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(otherNode.getId()));
publishedClusterState.metaData(MetaData.builder().coordinationMetaData(CoordinationMetaData.builder()
.term(1)
.lastAcceptedConfiguration(new VotingConfiguration(singleton(otherNode.getId())))
.lastCommittedConfiguration(new VotingConfiguration(singleton(otherNode.getId())))
.build()));
transportService.sendRequest(localNode, PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(PublicationTransportHandler.serializeFullClusterState(publishedClusterState.build(), Version.CURRENT),
Version.CURRENT),
new TransportResponseHandler<PublishWithJoinResponse>() {
@Override
public void handleResponse(PublishWithJoinResponse response) {
// do nothing
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
@Override
public String executor() {
return Names.SAME;
}
@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
public void testGetsDiscoveredNodesWithZeroTimeout() throws InterruptedException {
setupGetDiscoveredNodesAction();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
assertWaitConditionMet(getDiscoveredNodesRequest);
}
public void testGetsDiscoveredNodesByAddress() throws InterruptedException {
setupGetDiscoveredNodesAction();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), otherNode.getAddress().toString()));
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
assertWaitConditionMet(getDiscoveredNodesRequest);
}
public void testGetsDiscoveredNodesByName() throws InterruptedException {
setupGetDiscoveredNodesAction();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getName(), otherNode.getName()));
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
assertWaitConditionMet(getDiscoveredNodesRequest);
}
public void testGetsDiscoveredNodesByIP() throws InterruptedException {
setupGetDiscoveredNodesAction();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
String ip = localNode.getAddress().getAddress();
getDiscoveredNodesRequest.setRequiredNodes(Collections.singletonList(ip));
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, '[' + ip + "] matches [");
}
public void testGetsDiscoveredNodesDuplicateName() throws InterruptedException {
setupGetDiscoveredNodesAction();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
String name = localNode.getName();
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(name, name));
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches [" + name + ", " + name + ']');
}
public void testGetsDiscoveredNodesWithDuplicateMatchNameAndAddress() throws InterruptedException {
setupGetDiscoveredNodesAction();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), localNode.getName()));
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
assertWaitConditionFailedOnDuplicate(getDiscoveredNodesRequest, "[" + localNode + "] matches [");
}
public void testGetsDiscoveredNodesTimeoutOnMissing() throws InterruptedException {
setupGetDiscoveredNodesAction();
final CountDownLatch latch = new CountDownLatch(1);
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getAddress().toString(), "_missing"));
getDiscoveredNodesRequest.setTimeout(TimeValue.ZERO);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
assertThat(exp.getRootCause(), instanceOf(ElasticsearchTimeoutException.class));
latch.countDown();
}
});
latch.await(10L, TimeUnit.SECONDS);
}
public void testThrowsExceptionIfDuplicateDiscoveredLater() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
final String ip = localNode.getAddress().getAddress();
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(ip, "not-a-node"));
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
Throwable t = exp.getRootCause();
assertThat(t, instanceOf(IllegalArgumentException.class));
assertThat(t.getMessage(), startsWith('[' + ip + "] matches ["));
countDownLatch.countDown();
}
});
executeRequestPeersAction();
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
private void executeRequestPeersAction() {
threadPool.generic().execute(() ->
transportService.sendRequest(localNode, REQUEST_PEERS_ACTION_NAME, new PeersRequest(otherNode, emptyList()),
new TransportResponseHandler<PeersResponse>() {
@Override
public PeersResponse read(StreamInput in) throws IOException {
return new PeersResponse(in);
}
@Override
public void handleResponse(PeersResponse response) {
}
@Override
public void handleException(TransportException exp) {
}
@Override
public String executor() {
return Names.SAME;
}
}));
}
private void setupGetDiscoveredNodesAction() throws InterruptedException {
new TransportGetDiscoveredNodesAction(Settings.EMPTY, EMPTY_FILTERS, transportService, coordinator); // registers action
transportService.start();
transportService.acceptIncomingRequests();
coordinator.start();
coordinator.startInitialJoin();
executeRequestPeersAction();
final GetDiscoveredNodesRequest getDiscoveredNodesRequest = new GetDiscoveredNodesRequest();
getDiscoveredNodesRequest.setRequiredNodes(Arrays.asList(localNode.getName(), otherNode.getName()));
assertWaitConditionMet(getDiscoveredNodesRequest);
}
private void assertWaitConditionMet(GetDiscoveredNodesRequest getDiscoveredNodesRequest) throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
assertThat(response.getNodes(), containsInAnyOrder(localNode, otherNode));
countDownLatch.countDown();
}
@Override
public void handleException(TransportException exp) {
throw new AssertionError("should not be called", exp);
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
private void assertWaitConditionFailedOnDuplicate(GetDiscoveredNodesRequest getDiscoveredNodesRequest, String message)
throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
transportService.sendRequest(localNode, GetDiscoveredNodesAction.NAME, getDiscoveredNodesRequest, new ResponseHandler() {
@Override
public void handleResponse(GetDiscoveredNodesResponse response) {
throw new AssertionError("should not be called");
}
@Override
public void handleException(TransportException exp) {
Throwable t = exp.getRootCause();
assertThat(t, instanceOf(IllegalArgumentException.class));
assertThat(t.getMessage(), startsWith(message));
countDownLatch.countDown();
}
});
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}
private abstract class ResponseHandler implements TransportResponseHandler<GetDiscoveredNodesResponse> {
@Override
public String executor() {
return Names.SAME;
}
@Override
public GetDiscoveredNodesResponse read(StreamInput in) throws IOException {
return new GetDiscoveredNodesResponse(in);
}
}
}

View File

@ -20,49 +20,52 @@ package org.elasticsearch.cluster.coordination;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterAction;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapClusterResponse;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration.NodeDescription;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesAction;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesRequest;
import org.elasticsearch.action.admin.cluster.bootstrap.GetDiscoveredNodesResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
import java.util.Set;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING;
import static org.elasticsearch.common.settings.Settings.builder;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING;
import static org.elasticsearch.discovery.zen.SettingsBasedHostsProvider.DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
public class ClusterBootstrapServiceTests extends ESTestCase {
private DiscoveryNode localNode, otherNode1, otherNode2;
private DeterministicTaskQueue deterministicTaskQueue;
private TransportService transportService;
private ClusterBootstrapService clusterBootstrapService;
@Before
public void createServices() {
@ -81,10 +84,6 @@ public class ClusterBootstrapServiceTests extends ESTestCase {
transportService = transport.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(),
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> localNode, null, emptySet());
clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(),
localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService);
}
private DiscoveryNode newDiscoveryNode(String nodeName) {
@ -92,152 +91,392 @@ public class ClusterBootstrapServiceTests extends ESTestCase {
Version.CURRENT);
}
private void startServices() {
transportService.start();
transportService.acceptIncomingRequests();
clusterBootstrapService.start();
}
public void testDoesNothingOnNonMasterNodes() {
localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> {
throw new AssertionError("should not make a discovery request");
});
startServices();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNothingByDefaultIfHostsProviderConfigured() {
testConfiguredIfSettingSet(builder().putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()));
}
public void testDoesNothingByDefaultIfUnicastHostsConfigured() {
testConfiguredIfSettingSet(builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()));
}
public void testDoesNothingByDefaultIfMasterNodesConfigured() {
testConfiguredIfSettingSet(builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()));
}
private void testConfiguredIfSettingSet(Builder builder) {
clusterBootstrapService = new ClusterBootstrapService(builder.build(), transportService);
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> {
throw new AssertionError("should not make a discovery request");
});
startServices();
deterministicTaskQueue.runAllTasks();
}
public void testBootstrapsAutomaticallyWithDefaultConfiguration() {
clusterBootstrapService = new ClusterBootstrapService(Settings.EMPTY, transportService);
final Settings.Builder settings = Settings.builder();
final long timeout;
if (randomBoolean()) {
timeout = UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.get(Settings.EMPTY).millis();
} else {
timeout = randomLongBetween(1, 10000);
settings.put(UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING.getKey(), timeout + "ms");
}
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes)));
final AtomicReference<Supplier<Iterable<DiscoveryNode>>> discoveredNodesSupplier = new AtomicReference<>(() -> {
throw new AssertionError("should not be called yet");
});
final AtomicBoolean bootstrapped = new AtomicBoolean();
transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new,
(request, channel, task) -> {
assertThat(request.getBootstrapConfiguration().getNodeDescriptions().stream()
.map(NodeDescription::getId).collect(Collectors.toSet()),
equalTo(discoveredNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())));
ClusterBootstrapService clusterBootstrapService
= new ClusterBootstrapService(settings.build(), transportService, () -> discoveredNodesSupplier.get().get(),
() -> false, vc -> {
assertTrue(bootstrapped.compareAndSet(false, true));
assertThat(vc.getNodeIds(),
equalTo(Stream.of(localNode, otherNode1, otherNode2).map(DiscoveryNode::getId).collect(Collectors.toSet())));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), greaterThanOrEqualTo(timeout));
});
channel.sendResponse(new BootstrapClusterResponse(randomBoolean()));
assertTrue(bootstrapped.compareAndSet(false, true));
});
startServices();
deterministicTaskQueue.runAllTasks();
deterministicTaskQueue.scheduleAt(timeout - 1,
() -> discoveredNodesSupplier.set(() -> Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet())));
transportService.start();
clusterBootstrapService.scheduleUnconfiguredBootstrap();
deterministicTaskQueue.runAllTasksInTimeOrder();
assertTrue(bootstrapped.get());
}
public void testDoesNotRetryOnDiscoveryFailure() {
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
new TransportRequestHandler<GetDiscoveredNodesRequest>() {
private boolean called = false;
public void testDoesNothingByDefaultIfHostsProviderConfigured() {
testDoesNothingWithSettings(builder().putList(DISCOVERY_HOSTS_PROVIDER_SETTING.getKey()));
}
@Override
public void messageReceived(GetDiscoveredNodesRequest request, TransportChannel channel, Task task) {
assert called == false;
called = true;
throw new IllegalArgumentException("simulate failure of discovery request");
}
});
public void testDoesNothingByDefaultIfUnicastHostsConfigured() {
testDoesNothingWithSettings(builder().putList(DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey()));
}
startServices();
public void testDoesNothingByDefaultIfMasterNodesConfigured() {
testDoesNothingWithSettings(builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()));
}
public void testDoesNothingByDefaultOnMasterIneligibleNodes() {
localNode = new DiscoveryNode("local", randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), emptySet(),
Version.CURRENT);
testDoesNothingWithSettings(Settings.builder());
}
private void testDoesNothingWithSettings(Settings.Builder builder) {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(builder.build(), transportService, () -> {
throw new AssertionError("should not be called");
}, () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.scheduleUnconfiguredBootstrap();
deterministicTaskQueue.runAllTasks();
}
public void testBootstrapsOnDiscoverySuccess() {
final AtomicBoolean discoveryAttempted = new AtomicBoolean();
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> {
assertTrue(discoveryAttempted.compareAndSet(false, true));
channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes));
});
final AtomicBoolean bootstrapAttempted = new AtomicBoolean();
transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new,
(request, channel, task) -> {
assertTrue(bootstrapAttempted.compareAndSet(false, true));
channel.sendResponse(new BootstrapClusterResponse(false));
});
startServices();
public void testDoesNothingByDefaultIfZen1NodesDiscovered() {
final DiscoveryNode zen1Node = new DiscoveryNode("zen1", buildNewFakeTransportAddress(), singletonMap("zen1", "true"),
singleton(Role.MASTER), Version.CURRENT);
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.EMPTY, transportService, () ->
Stream.of(localNode, zen1Node).collect(Collectors.toSet()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.scheduleUnconfiguredBootstrap();
deterministicTaskQueue.runAllTasks();
assertTrue(discoveryAttempted.get());
assertTrue(bootstrapAttempted.get());
}
public void testRetriesOnBootstrapFailure() {
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes)));
AtomicLong callCount = new AtomicLong();
transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new,
(request, channel, task) -> {
callCount.incrementAndGet();
channel.sendResponse(new ElasticsearchException("simulated exception"));
public void testThrowsExceptionOnDuplicates() {
final IllegalArgumentException illegalArgumentException = expectThrows(IllegalArgumentException.class, () -> {
new ClusterBootstrapService(builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), "duplicate-requirement", "duplicate-requirement").build(),
transportService, Collections::emptyList, () -> false, vc -> {
throw new AssertionError("should not be called");
});
});
startServices();
while (callCount.get() < 5) {
if (deterministicTaskQueue.hasDeferredTasks()) {
deterministicTaskQueue.advanceTime();
}
deterministicTaskQueue.runAllRunnableTasks();
}
assertThat(illegalArgumentException.getMessage(), containsString(INITIAL_MASTER_NODES_SETTING.getKey()));
assertThat(illegalArgumentException.getMessage(), containsString("duplicate-requirement"));
}
public void testStopsRetryingBootstrapWhenStopped() {
final Set<DiscoveryNode> discoveredNodes = Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toSet());
transportService.registerRequestHandler(GetDiscoveredNodesAction.NAME, Names.SAME, GetDiscoveredNodesRequest::new,
(request, channel, task) -> channel.sendResponse(new GetDiscoveredNodesResponse(discoveredNodes)));
public void testBootstrapsOnDiscoveryOfAllRequiredNodes() {
final AtomicBoolean bootstrapped = new AtomicBoolean();
transportService.registerRequestHandler(BootstrapClusterAction.NAME, Names.SAME, BootstrapClusterRequest::new,
(request, channel, task) -> channel.sendResponse(new ElasticsearchException("simulated exception")));
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
assertTrue(bootstrapped.compareAndSet(false, true));
assertThat(vc.getNodeIds(), containsInAnyOrder(localNode.getId(), otherNode1.getId(), otherNode2.getId()));
assertThat(vc.getNodeIds(), not(hasItem(containsString("placeholder"))));
});
deterministicTaskQueue.scheduleAt(deterministicTaskQueue.getCurrentTimeMillis() + 200000, new Runnable() {
@Override
public void run() {
clusterBootstrapService.stop();
}
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
@Override
public String toString() {
return "stop cluster bootstrap service";
bootstrapped.set(false);
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertFalse(bootstrapped.get()); // should only bootstrap once
}
public void testBootstrapsOnDiscoveryOfTwoOfThreeRequiredNodes() {
final AtomicBoolean bootstrapped = new AtomicBoolean();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () -> singletonList(otherNode1), () -> false, vc -> {
assertTrue(bootstrapped.compareAndSet(false, true));
assertThat(vc.getNodeIds(), hasSize(3));
assertThat(vc.getNodeIds(), hasItem(localNode.getId()));
assertThat(vc.getNodeIds(), hasItem(otherNode1.getId()));
assertThat(vc.getNodeIds(), hasItem(allOf(startsWith(BOOTSTRAP_PLACEHOLDER_PREFIX), containsString(otherNode2.getName()))));
assertTrue(vc.hasQuorum(Stream.of(localNode, otherNode1).map(DiscoveryNode::getId).collect(Collectors.toList())));
assertFalse(vc.hasQuorum(singletonList(localNode.getId())));
assertFalse(vc.hasQuorum(singletonList(otherNode1.getId())));
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
bootstrapped.set(false);
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertFalse(bootstrapped.get()); // should only bootstrap once
}
public void testBootstrapsOnDiscoveryOfThreeOfFiveRequiredNodes() {
final AtomicBoolean bootstrapped = new AtomicBoolean();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName(),
"missing-node-1", "missing-node-2").build(),
transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
assertTrue(bootstrapped.compareAndSet(false, true));
assertThat(vc.getNodeIds(), hasSize(5));
assertThat(vc.getNodeIds(), hasItem(localNode.getId()));
assertThat(vc.getNodeIds(), hasItem(otherNode1.getId()));
assertThat(vc.getNodeIds(), hasItem(otherNode2.getId()));
final List<String> placeholders
= vc.getNodeIds().stream().filter(ClusterBootstrapService::isBootstrapPlaceholder).collect(Collectors.toList());
assertThat(placeholders.size(), equalTo(2));
assertNotEquals(placeholders.get(0), placeholders.get(1));
assertThat(placeholders, hasItem(containsString("missing-node-1")));
assertThat(placeholders, hasItem(containsString("missing-node-2")));
assertTrue(vc.hasQuorum(Stream.of(localNode, otherNode1, otherNode2).map(DiscoveryNode::getId).collect(Collectors.toList())));
assertFalse(vc.hasQuorum(Stream.of(localNode, otherNode1).map(DiscoveryNode::getId).collect(Collectors.toList())));
assertFalse(vc.hasQuorum(Stream.of(localNode, otherNode1).map(DiscoveryNode::getId).collect(Collectors.toList())));
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
bootstrapped.set(false);
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertFalse(bootstrapped.get()); // should only bootstrap once
}
public void testDoesNotBootstrapIfNoNodesDiscovered() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, Collections::emptyList, () -> true, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotBootstrapIfTwoOfFiveNodesDiscovered() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(),
localNode.getName(), otherNode1.getName(), otherNode2.getName(), "not-a-node-1", "not-a-node-2").build(),
transportService, () -> Stream.of(otherNode1).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotBootstrapIfThreeOfSixNodesDiscovered() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(),
localNode.getName(), otherNode1.getName(), otherNode2.getName(), "not-a-node-1", "not-a-node-2", "not-a-node-3").build(),
transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotBootstrapIfAlreadyBootstrapped() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> true, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotBootstrapsOnNonMasterNode() {
localNode = new DiscoveryNode("local", randomAlphaOfLength(10), buildNewFakeTransportAddress(), emptyMap(), emptySet(),
Version.CURRENT);
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () ->
Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotBootstrapsIfNotConfigured() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey()).build(), transportService,
() -> Stream.of(localNode, otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.scheduleUnconfiguredBootstrap();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotBootstrapsIfZen1NodesDiscovered() {
final DiscoveryNode zen1Node = new DiscoveryNode("zen1", buildNewFakeTransportAddress(), singletonMap("zen1", "true"),
singleton(Role.MASTER), Version.CURRENT);
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () -> Stream.of(otherNode1, otherNode2, zen1Node).collect(Collectors.toList()), () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testRetriesBootstrappingOnException() {
final AtomicLong bootstrappingAttempts = new AtomicLong();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () -> Stream.of(otherNode1, otherNode2).collect(Collectors.toList()), () -> false, vc -> {
bootstrappingAttempts.incrementAndGet();
if (bootstrappingAttempts.get() < 5L) {
throw new ElasticsearchException("test");
}
});
startServices();
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
// termination means success
assertThat(bootstrappingAttempts.get(), greaterThanOrEqualTo(5L));
assertThat(deterministicTaskQueue.getCurrentTimeMillis(), greaterThanOrEqualTo(40000L));
}
public void testCancelsBootstrapIfRequirementMatchesMultipleNodes() {
AtomicReference<Iterable<DiscoveryNode>> discoveredNodes
= new AtomicReference<>(Stream.of(otherNode1, otherNode2).collect(Collectors.toList()));
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getAddress().getAddress()).build(),
transportService, discoveredNodes::get, () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
discoveredNodes.set(emptyList());
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testCancelsBootstrapIfNodeMatchesMultipleRequirements() {
AtomicReference<Iterable<DiscoveryNode>> discoveredNodes
= new AtomicReference<>(Stream.of(otherNode1, otherNode2).collect(Collectors.toList()));
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), otherNode1.getAddress().toString(), otherNode1.getName())
.build(),
transportService, discoveredNodes::get, () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
discoveredNodes.set(Stream.of(new DiscoveryNode(otherNode1.getName(), randomAlphaOfLength(10), buildNewFakeTransportAddress(),
emptyMap(), singleton(Role.MASTER), Version.CURRENT),
new DiscoveryNode("yet-another-node", randomAlphaOfLength(10), otherNode1.getAddress(), emptyMap(), singleton(Role.MASTER),
Version.CURRENT)).collect(Collectors.toList()));
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testMatchesOnNodeName() {
final AtomicBoolean bootstrapped = new AtomicBoolean();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName()).build(), transportService,
Collections::emptyList, () -> false, vc -> assertTrue(bootstrapped.compareAndSet(false, true)));
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
}
public void testMatchesOnNodeAddress() {
final AtomicBoolean bootstrapped = new AtomicBoolean();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getAddress().toString()).build(), transportService,
Collections::emptyList, () -> false, vc -> assertTrue(bootstrapped.compareAndSet(false, true)));
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
}
public void testMatchesOnNodeHostAddress() {
final AtomicBoolean bootstrapped = new AtomicBoolean();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getAddress().getAddress()).build(),
transportService, Collections::emptyList, () -> false, vc -> assertTrue(bootstrapped.compareAndSet(false, true)));
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
}
public void testDoesNotJustMatchEverything() {
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(
Settings.builder().putList(INITIAL_MASTER_NODES_SETTING.getKey(), randomAlphaOfLength(10)).build(), transportService,
Collections::emptyList, () -> false, vc -> {
throw new AssertionError("should not be called");
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
}
public void testDoesNotIncludeExtraNodes() {
final DiscoveryNode extraNode = newDiscoveryNode("extra-node");
final AtomicBoolean bootstrapped = new AtomicBoolean();
ClusterBootstrapService clusterBootstrapService = new ClusterBootstrapService(Settings.builder().putList(
INITIAL_MASTER_NODES_SETTING.getKey(), localNode.getName(), otherNode1.getName(), otherNode2.getName()).build(),
transportService, () -> Stream.of(otherNode1, otherNode2, extraNode).collect(Collectors.toList()), () -> false,
vc -> {
assertTrue(bootstrapped.compareAndSet(false, true));
assertThat(vc.getNodeIds(), not(hasItem(extraNode.getId())));
});
transportService.start();
clusterBootstrapService.onFoundPeersUpdated();
deterministicTaskQueue.runAllTasks();
assertTrue(bootstrapped.get());
}
}

View File

@ -38,6 +38,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
@ -245,6 +246,13 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", BOOTSTRAP_PLACEHOLDER_PREFIX + "n3"),
emptyList(), emptyList(), 0L).getDescription(),
is("master not discovered or elected yet, an election requires 2 nodes with ids [n1, n2], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4"), emptyList(), emptyList(), 0L)
.getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " +
@ -259,6 +267,20 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3", "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"),
emptyList(), emptyList(), 0L).getDescription(),
is("master not discovered or elected yet, an election requires at least 3 nodes with ids from [n1, n2, n3, n4], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, "n1", "n2", "n3",
BOOTSTRAP_PLACEHOLDER_PREFIX + "n4", BOOTSTRAP_PLACEHOLDER_PREFIX + "n5"), emptyList(), emptyList(), 0L).getDescription(),
is("master not discovered or elected yet, an election requires 3 nodes with ids [n1, n2, n3], " +
"have discovered [] which is not a quorum; " +
"discovery will continue using [] from hosts providers and [" + localNode +
"] from last-known cluster state; node term 0, last-accepted version 0 in term 0"));
assertThat(new ClusterFormationState(Settings.EMPTY, state(localNode, new String[]{"n1"}, new String[]{"n1"}), emptyList(),
emptyList(), 0L).getDescription(),
is("master not discovered or elected yet, an election requires a node with id [n1], " +

View File

@ -26,7 +26,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -48,7 +47,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -82,15 +80,16 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
@ -125,7 +124,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
@ -726,70 +724,6 @@ public class CoordinatorTests extends ESTestCase {
// assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
}
public void testDiscoveryOfPeersTriggersNotification() {
final Cluster cluster = new Cluster(randomIntBetween(2, 5));
// register a listener and then deregister it again to show that it is not called after deregistration
try (Releasable ignored = cluster.getAnyNode().coordinator.withDiscoveryListener(ActionListener.wrap(() -> {
throw new AssertionError("should not be called");
}))) {
// do nothing
}
final long startTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis();
final ClusterNode bootstrapNode = cluster.getAnyNode();
final AtomicBoolean hasDiscoveredAllPeers = new AtomicBoolean();
assertFalse(bootstrapNode.coordinator.getFoundPeers().iterator().hasNext());
try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener(
new ActionListener<Iterable<DiscoveryNode>>() {
@Override
public void onResponse(Iterable<DiscoveryNode> discoveryNodes) {
int peerCount = 0;
for (final DiscoveryNode discoveryNode : discoveryNodes) {
peerCount++;
}
assertThat(peerCount, lessThan(cluster.size()));
if (peerCount == cluster.size() - 1 && hasDiscoveredAllPeers.get() == false) {
hasDiscoveredAllPeers.set(true);
final long elapsedTimeMillis = cluster.deterministicTaskQueue.getCurrentTimeMillis() - startTimeMillis;
logger.info("--> {} discovered {} peers in {}ms", bootstrapNode.getId(), cluster.size() - 1, elapsedTimeMillis);
assertThat(elapsedTimeMillis, lessThanOrEqualTo(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2));
}
}
@Override
public void onFailure(Exception e) {
throw new AssertionError("unexpected", e);
}
})) {
cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "discovery phase");
}
assertTrue(hasDiscoveredAllPeers.get());
final AtomicBoolean receivedAlreadyBootstrappedException = new AtomicBoolean();
try (Releasable ignored = bootstrapNode.coordinator.withDiscoveryListener(
new ActionListener<Iterable<DiscoveryNode>>() {
@Override
public void onResponse(Iterable<DiscoveryNode> discoveryNodes) {
// ignore
}
@Override
public void onFailure(Exception e) {
if (e instanceof ClusterAlreadyBootstrappedException) {
receivedAlreadyBootstrappedException.set(true);
} else {
throw new AssertionError("unexpected", e);
}
}
})) {
cluster.stabilise();
}
assertTrue(receivedAlreadyBootstrappedException.get());
}
public void testSettingInitialConfigurationTriggersElection() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2 + randomLongBetween(0, 60000), "initial discovery phase");
@ -1271,12 +1205,8 @@ public class CoordinatorTests extends ESTestCase {
}
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
clusterNode.onNode(
() -> {
logger.debug("----> [runRandomly {}] applying initial configuration {} to {}",
thisStep, initialConfiguration, clusterNode.getId());
clusterNode.coordinator.setInitialConfiguration(initialConfiguration);
}).run();
logger.debug("----> [runRandomly {}] applying initial configuration on {}", step, clusterNode.getId());
clusterNode.applyInitialConfiguration();
} else {
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
deterministicTaskQueue.advanceTime();
@ -1803,11 +1733,18 @@ public class CoordinatorTests extends ESTestCase {
void applyInitialConfiguration() {
onNode(() -> {
final Set<String> nodeIdsWithPlaceholders = new HashSet<>(initialConfiguration.getNodeIds());
Stream.generate(() -> BOOTSTRAP_PLACEHOLDER_PREFIX + UUIDs.randomBase64UUID(random()))
.limit((Math.max(initialConfiguration.getNodeIds().size(), 2) - 1) / 2)
.forEach(nodeIdsWithPlaceholders::add);
final VotingConfiguration configurationWithPlaceholders = new VotingConfiguration(new HashSet<>(
randomSubsetOf(initialConfiguration.getNodeIds().size(), nodeIdsWithPlaceholders)));
try {
coordinator.setInitialConfiguration(initialConfiguration);
logger.info("successfully set initial configuration to {}", initialConfiguration);
coordinator.setInitialConfiguration(configurationWithPlaceholders);
logger.info("successfully set initial configuration to {}", configurationWithPlaceholders);
} catch (CoordinationStateRejectedException e) {
logger.info(new ParameterizedMessage("failed to set initial configuration to {}", initialConfiguration), e);
logger.info(new ParameterizedMessage("failed to set initial configuration to {}",
configurationWithPlaceholders), e);
}
}).run();
}

View File

@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.bootstrap.BootstrapConfiguration;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
@ -48,13 +47,14 @@ import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.CoordinatorTests;
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.coordination.MockSinglePrioritizingExecutor;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -254,14 +254,10 @@ public class SnapshotsServiceTests extends ESTestCase {
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
final BootstrapConfiguration bootstrapConfiguration = new BootstrapConfiguration(
testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(node -> new BootstrapConfiguration.NodeDescription(node.node))
.distinct()
.collect(Collectors.toList()));
final VotingConfiguration votingConfiguration = new VotingConfiguration(testClusterNodes.nodes.values().stream().map(n -> n.node)
.filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet()));
testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach(
testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(bootstrapConfiguration)
);
testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(votingConfiguration));
runUntil(
() -> {