Enforce cluster UUIDs (#37775)
This commit adds join validation around cluster UUIDs, preventing a node to join a cluster if it was previously part of another cluster. The commit introduces a new flag to the cluster state, clusterUUIDCommitted, which denotes whether the node has locked into a cluster with the given uuid. When a cluster is committed, this flag will turn to true, and subsequent cluster state updates will keep the information about committal. Note that coordinating-only nodes are still free to switch clusters at will (after restart), as they don't carry any persistent state.
This commit is contained in:
parent
09a11a34ef
commit
3c9f7031b9
|
@ -422,7 +422,7 @@ public class CoordinationState {
|
|||
logger.trace("handleCommit: applying commit request for term [{}] and version [{}]", applyCommit.getTerm(),
|
||||
applyCommit.getVersion());
|
||||
|
||||
persistedState.markLastAcceptedConfigAsCommitted();
|
||||
persistedState.markLastAcceptedStateAsCommitted();
|
||||
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
|
||||
}
|
||||
|
||||
|
@ -471,16 +471,32 @@ public class CoordinationState {
|
|||
/**
|
||||
* Marks the last accepted cluster state as committed.
|
||||
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
|
||||
* with the last committed configuration now corresponding to the last accepted configuration.
|
||||
* with the last committed configuration now corresponding to the last accepted configuration, and the cluster uuid, if set,
|
||||
* marked as committed.
|
||||
*/
|
||||
default void markLastAcceptedConfigAsCommitted() {
|
||||
default void markLastAcceptedStateAsCommitted() {
|
||||
final ClusterState lastAcceptedState = getLastAcceptedState();
|
||||
MetaData.Builder metaDataBuilder = null;
|
||||
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
|
||||
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
|
||||
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
|
||||
.build();
|
||||
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
|
||||
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
|
||||
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
|
||||
metaDataBuilder.coordinationMetaData(coordinationMetaData);
|
||||
}
|
||||
// if we receive a commit from a Zen1 master that has not recovered its state yet, the cluster uuid might not been known yet.
|
||||
assert lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false ||
|
||||
lastAcceptedState.term() == ZEN1_BWC_TERM :
|
||||
"received cluster state with empty cluster uuid but not Zen1 BWC term: " + lastAcceptedState;
|
||||
if (lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false &&
|
||||
lastAcceptedState.metaData().clusterUUIDCommitted() == false) {
|
||||
if (metaDataBuilder == null) {
|
||||
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
|
||||
}
|
||||
metaDataBuilder.clusterUUIDCommitted(true);
|
||||
}
|
||||
if (metaDataBuilder != null) {
|
||||
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaDataBuilder).build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
this.masterService = masterService;
|
||||
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
|
||||
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
|
||||
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
|
||||
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
|
||||
this.persistedStateSupplier = persistedStateSupplier;
|
||||
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
|
||||
this.lastKnownLeader = Optional.empty();
|
||||
|
@ -281,7 +281,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
+ lastKnownLeader + ", rejecting");
|
||||
}
|
||||
|
||||
if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) {
|
||||
final ClusterState localState = coordinationState.get().getLastAcceptedState();
|
||||
|
||||
if (localState.metaData().clusterUUIDCommitted() &&
|
||||
localState.metaData().clusterUUID().equals(publishRequest.getAcceptedState().metaData().clusterUUID()) == false) {
|
||||
logger.warn("received cluster state from {} with a different cluster uuid {} than local cluster uuid {}, rejecting",
|
||||
sourceNode, publishRequest.getAcceptedState().metaData().clusterUUID(), localState.metaData().clusterUUID());
|
||||
throw new CoordinationStateRejectedException("received cluster state from " + sourceNode +
|
||||
" with a different cluster uuid " + publishRequest.getAcceptedState().metaData().clusterUUID() +
|
||||
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
|
||||
}
|
||||
|
||||
if (publishRequest.getAcceptedState().term() > localState.term()) {
|
||||
// only do join validation if we have not accepted state from this master yet
|
||||
onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState()));
|
||||
}
|
||||
|
@ -653,6 +664,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
|||
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
|
||||
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
|
||||
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
|
||||
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
|
||||
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
|
||||
: preVoteCollector + " vs " + getPreVoteResponse();
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ import java.util.Set;
|
|||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class JoinHelper {
|
||||
|
||||
|
@ -84,7 +85,7 @@ public class JoinHelper {
|
|||
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
|
||||
TransportService transportService, LongSupplier currentTermSupplier,
|
||||
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
|
||||
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
|
||||
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
|
||||
this.masterService = masterService;
|
||||
|
@ -132,6 +133,13 @@ public class JoinHelper {
|
|||
transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME,
|
||||
MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
|
||||
(request, channel, task) -> {
|
||||
final ClusterState localState = currentStateSupplier.get();
|
||||
if (localState.metaData().clusterUUIDCommitted() &&
|
||||
localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID()) == false) {
|
||||
throw new CoordinationStateRejectedException("join validation on cluster state" +
|
||||
" with a different cluster uuid " + request.getState().metaData().clusterUUID() +
|
||||
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
|
||||
}
|
||||
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
|
||||
channel.sendResponse(Empty.INSTANCE);
|
||||
});
|
||||
|
|
|
@ -88,6 +88,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
private static final Logger logger = LogManager.getLogger(MetaData.class);
|
||||
|
||||
public static final String ALL = "_all";
|
||||
public static final String UNKNOWN_CLUSTER_UUID = "_na_";
|
||||
|
||||
public enum XContentContext {
|
||||
/* Custom metadata should be returns as part of API call */
|
||||
|
@ -159,6 +160,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
|
||||
|
||||
private final String clusterUUID;
|
||||
private final boolean clusterUUIDCommitted;
|
||||
private final long version;
|
||||
|
||||
private final CoordinationMetaData coordinationMetaData;
|
||||
|
@ -179,12 +181,13 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
|
||||
private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;
|
||||
|
||||
MetaData(String clusterUUID, long version, CoordinationMetaData coordinationMetaData,
|
||||
MetaData(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetaData coordinationMetaData,
|
||||
Settings transientSettings, Settings persistentSettings,
|
||||
ImmutableOpenMap<String, IndexMetaData> indices, ImmutableOpenMap<String, IndexTemplateMetaData> templates,
|
||||
ImmutableOpenMap<String, Custom> customs, String[] allIndices, String[] allOpenIndices, String[] allClosedIndices,
|
||||
SortedMap<String, AliasOrIndex> aliasAndIndexLookup) {
|
||||
this.clusterUUID = clusterUUID;
|
||||
this.clusterUUIDCommitted = clusterUUIDCommitted;
|
||||
this.version = version;
|
||||
this.coordinationMetaData = coordinationMetaData;
|
||||
this.transientSettings = transientSettings;
|
||||
|
@ -218,6 +221,14 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
return this.clusterUUID;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the current node with the given cluster state is locked into the cluster with the UUID returned by {@link #clusterUUID()},
|
||||
* meaning that it will not accept any cluster state with a different clusterUUID.
|
||||
*/
|
||||
public boolean clusterUUIDCommitted() {
|
||||
return this.clusterUUIDCommitted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the merged transient and persistent settings.
|
||||
*/
|
||||
|
@ -757,6 +768,12 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
if (!metaData1.templates.equals(metaData2.templates())) {
|
||||
return false;
|
||||
}
|
||||
if (!metaData1.clusterUUID.equals(metaData2.clusterUUID)) {
|
||||
return false;
|
||||
}
|
||||
if (metaData1.clusterUUIDCommitted != metaData2.clusterUUIDCommitted) {
|
||||
return false;
|
||||
}
|
||||
// Check if any persistent metadata needs to be saved
|
||||
int customCount1 = 0;
|
||||
for (ObjectObjectCursor<String, Custom> cursor : metaData1.customs) {
|
||||
|
@ -798,6 +815,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
|
||||
private long version;
|
||||
private String clusterUUID;
|
||||
private boolean clusterUUIDCommitted;
|
||||
private CoordinationMetaData coordinationMetaData;
|
||||
private Settings transientSettings;
|
||||
private Settings persistentSettings;
|
||||
|
@ -807,6 +825,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
|
||||
MetaDataDiff(MetaData before, MetaData after) {
|
||||
clusterUUID = after.clusterUUID;
|
||||
clusterUUIDCommitted = after.clusterUUIDCommitted;
|
||||
version = after.version;
|
||||
coordinationMetaData = after.coordinationMetaData;
|
||||
transientSettings = after.transientSettings;
|
||||
|
@ -818,8 +837,11 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
|
||||
MetaDataDiff(StreamInput in) throws IOException {
|
||||
clusterUUID = in.readString();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
clusterUUIDCommitted = in.readBoolean();
|
||||
}
|
||||
version = in.readLong();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) { //TODO revisit after Zen2 BWC is implemented
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
coordinationMetaData = new CoordinationMetaData(in);
|
||||
} else {
|
||||
coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
|
||||
|
@ -836,6 +858,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(clusterUUID);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeBoolean(clusterUUIDCommitted);
|
||||
}
|
||||
out.writeLong(version);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
coordinationMetaData.writeTo(out);
|
||||
|
@ -851,6 +876,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
public MetaData apply(MetaData part) {
|
||||
Builder builder = builder();
|
||||
builder.clusterUUID(clusterUUID);
|
||||
builder.clusterUUIDCommitted(clusterUUIDCommitted);
|
||||
builder.version(version);
|
||||
builder.coordinationMetaData(coordinationMetaData);
|
||||
builder.transientSettings(transientSettings);
|
||||
|
@ -866,6 +892,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
Builder builder = new Builder();
|
||||
builder.version = in.readLong();
|
||||
builder.clusterUUID = in.readString();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
builder.clusterUUIDCommitted = in.readBoolean();
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
builder.coordinationMetaData(new CoordinationMetaData(in));
|
||||
}
|
||||
|
@ -891,6 +920,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeLong(version);
|
||||
out.writeString(clusterUUID);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeBoolean(clusterUUIDCommitted);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
coordinationMetaData.writeTo(out);
|
||||
}
|
||||
|
@ -930,6 +962,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
public static class Builder {
|
||||
|
||||
private String clusterUUID;
|
||||
private boolean clusterUUIDCommitted;
|
||||
private long version;
|
||||
|
||||
private CoordinationMetaData coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
|
||||
|
@ -941,7 +974,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
private final ImmutableOpenMap.Builder<String, Custom> customs;
|
||||
|
||||
public Builder() {
|
||||
clusterUUID = "_na_";
|
||||
clusterUUID = UNKNOWN_CLUSTER_UUID;
|
||||
indices = ImmutableOpenMap.builder();
|
||||
templates = ImmutableOpenMap.builder();
|
||||
customs = ImmutableOpenMap.builder();
|
||||
|
@ -950,6 +983,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
|
||||
public Builder(MetaData metaData) {
|
||||
this.clusterUUID = metaData.clusterUUID;
|
||||
this.clusterUUIDCommitted = metaData.clusterUUIDCommitted;
|
||||
this.coordinationMetaData = metaData.coordinationMetaData;
|
||||
this.transientSettings = metaData.transientSettings;
|
||||
this.persistentSettings = metaData.persistentSettings;
|
||||
|
@ -1125,8 +1159,13 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
|
||||
this.clusterUUIDCommitted = clusterUUIDCommitted;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder generateClusterUuidIfNeeded() {
|
||||
if (clusterUUID.equals("_na_")) {
|
||||
if (clusterUUID.equals(UNKNOWN_CLUSTER_UUID)) {
|
||||
clusterUUID = UUIDs.randomBase64UUID();
|
||||
}
|
||||
return this;
|
||||
|
@ -1182,8 +1221,9 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
String[] allOpenIndicesArray = allOpenIndices.toArray(new String[allOpenIndices.size()]);
|
||||
String[] allClosedIndicesArray = allClosedIndices.toArray(new String[allClosedIndices.size()]);
|
||||
|
||||
return new MetaData(clusterUUID, version, coordinationMetaData, transientSettings, persistentSettings, indices.build(),
|
||||
templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray, aliasAndIndexLookup);
|
||||
return new MetaData(clusterUUID, clusterUUIDCommitted, version, coordinationMetaData, transientSettings, persistentSettings,
|
||||
indices.build(), templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray,
|
||||
aliasAndIndexLookup);
|
||||
}
|
||||
|
||||
private SortedMap<String, AliasOrIndex> buildAliasAndIndexLookup() {
|
||||
|
@ -1226,6 +1266,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
|
||||
builder.field("version", metaData.version());
|
||||
builder.field("cluster_uuid", metaData.clusterUUID);
|
||||
builder.field("cluster_uuid_committed", metaData.clusterUUIDCommitted);
|
||||
|
||||
builder.startObject("cluster_coordination");
|
||||
metaData.coordinationMetaData().toXContent(builder, params);
|
||||
|
@ -1324,6 +1365,8 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
|
|||
builder.version = parser.longValue();
|
||||
} else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) {
|
||||
builder.clusterUUID = parser.text();
|
||||
} else if ("cluster_uuid_committed".equals(currentFieldName)) {
|
||||
builder.clusterUUIDCommitted = parser.booleanValue();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination;
|
|||
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import org.apache.logging.log4j.CloseableThreadContext;
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
|
@ -36,6 +37,7 @@ import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigu
|
|||
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
|
||||
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
|
||||
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
|
||||
import org.elasticsearch.cluster.metadata.Manifest;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
|
||||
|
@ -48,6 +50,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -59,9 +62,11 @@ import org.elasticsearch.common.util.set.Sets;
|
|||
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
|
||||
import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.gateway.MetaStateService;
|
||||
import org.elasticsearch.gateway.MockGatewayMetaState;
|
||||
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.MockLogAppender;
|
||||
import org.elasticsearch.test.disruption.DisruptableMockTransport;
|
||||
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -84,6 +89,7 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
@ -137,6 +143,13 @@ public class CoordinatorTests extends ESTestCase {
|
|||
|
||||
private final List<NodeEnvironment> nodeEnvironments = new ArrayList<>();
|
||||
|
||||
private final AtomicInteger nextNodeIndex = new AtomicInteger();
|
||||
|
||||
@Before
|
||||
public void resetNodeIndexBeforeEachTest() {
|
||||
nextNodeIndex.set(0);
|
||||
}
|
||||
|
||||
@After
|
||||
public void closeNodeEnvironmentsAfterEachTest() {
|
||||
for (NodeEnvironment nodeEnvironment : nodeEnvironments) {
|
||||
|
@ -153,6 +166,7 @@ public class CoordinatorTests extends ESTestCase {
|
|||
// check that runRandomly leads to reproducible results
|
||||
public void testRepeatableTests() throws Exception {
|
||||
final Callable<Long> test = () -> {
|
||||
resetNodeIndexBeforeEachTest();
|
||||
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
|
||||
cluster.runRandomly();
|
||||
final long afterRunRandomly = value(cluster.getAnyNode().getLastAppliedClusterState());
|
||||
|
@ -1001,6 +1015,52 @@ public class CoordinatorTests extends ESTestCase {
|
|||
assertTrue(cluster.clusterNodes.stream().allMatch(cn -> cn.getLastAppliedClusterState().version() == 0));
|
||||
}
|
||||
|
||||
public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessException {
|
||||
final Cluster cluster1 = new Cluster(randomIntBetween(1, 3));
|
||||
cluster1.runRandomly();
|
||||
cluster1.stabilise();
|
||||
|
||||
final Cluster cluster2 = new Cluster(3);
|
||||
cluster2.runRandomly();
|
||||
cluster2.stabilise();
|
||||
|
||||
final ClusterNode shiftedNode = randomFrom(cluster2.clusterNodes).restartedNode();
|
||||
final ClusterNode newNode = cluster1.new ClusterNode(nextNodeIndex.getAndIncrement(),
|
||||
shiftedNode.getLocalNode(), n -> shiftedNode.persistedState);
|
||||
cluster1.clusterNodes.add(newNode);
|
||||
|
||||
MockLogAppender mockAppender = new MockLogAppender();
|
||||
mockAppender.start();
|
||||
mockAppender.addExpectation(
|
||||
new MockLogAppender.SeenEventExpectation(
|
||||
"test1",
|
||||
JoinHelper.class.getCanonicalName(),
|
||||
Level.INFO,
|
||||
"*failed to join*"));
|
||||
Logger joinLogger = LogManager.getLogger(JoinHelper.class);
|
||||
Loggers.addAppender(joinLogger, mockAppender);
|
||||
cluster1.runFor(DEFAULT_STABILISATION_TIME, "failing join validation");
|
||||
try {
|
||||
mockAppender.assertAllExpectationsMatched();
|
||||
} finally {
|
||||
Loggers.removeAppender(joinLogger, mockAppender);
|
||||
mockAppender.stop();
|
||||
}
|
||||
assertTrue(newNode.getLastAppliedClusterState().version() == 0);
|
||||
|
||||
// reset clusterUUIDCommitted (and node / cluster state term) to let node join again
|
||||
// TODO: use elasticsearch-node detach-cluster tool once it's implemented
|
||||
final ClusterNode detachedNode = newNode.restartedNode(
|
||||
metaData -> MetaData.builder(metaData)
|
||||
.clusterUUIDCommitted(false)
|
||||
.coordinationMetaData(CoordinationMetaData.builder(metaData.coordinationMetaData())
|
||||
.term(0L).build())
|
||||
.build(),
|
||||
term -> 0L);
|
||||
cluster1.clusterNodes.replaceAll(cn -> cn == newNode ? detachedNode : cn);
|
||||
cluster1.stabilise();
|
||||
}
|
||||
|
||||
private static long defaultMillis(Setting<TimeValue> setting) {
|
||||
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
|
||||
}
|
||||
|
@ -1077,7 +1137,8 @@ public class CoordinatorTests extends ESTestCase {
|
|||
final Set<String> masterEligibleNodeIds = new HashSet<>(initialNodeCount);
|
||||
clusterNodes = new ArrayList<>(initialNodeCount);
|
||||
for (int i = 0; i < initialNodeCount; i++) {
|
||||
final ClusterNode clusterNode = new ClusterNode(i, allNodesMasterEligible || i == 0 || randomBoolean());
|
||||
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(),
|
||||
allNodesMasterEligible || i == 0 || randomBoolean());
|
||||
clusterNodes.add(clusterNode);
|
||||
if (clusterNode.getLocalNode().isMasterNode()) {
|
||||
masterEligibleNodeIds.add(clusterNode.getId());
|
||||
|
@ -1108,10 +1169,9 @@ public class CoordinatorTests extends ESTestCase {
|
|||
List<ClusterNode> addNodes(int newNodesCount) {
|
||||
logger.info("--> adding {} nodes", newNodesCount);
|
||||
|
||||
final int nodeSizeAtStart = clusterNodes.size();
|
||||
final List<ClusterNode> addedNodes = new ArrayList<>();
|
||||
for (int i = 0; i < newNodesCount; i++) {
|
||||
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i, true);
|
||||
final ClusterNode clusterNode = new ClusterNode(nextNodeIndex.getAndIncrement(), true);
|
||||
addedNodes.add(clusterNode);
|
||||
}
|
||||
clusterNodes.addAll(addedNodes);
|
||||
|
@ -1471,21 +1531,41 @@ public class CoordinatorTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
MockPersistedState(DiscoveryNode newLocalNode, MockPersistedState oldState) {
|
||||
MockPersistedState(DiscoveryNode newLocalNode, MockPersistedState oldState,
|
||||
Function<MetaData, MetaData> adaptGlobalMetaData, Function<Long, Long> adaptCurrentTerm) {
|
||||
try {
|
||||
if (oldState.nodeEnvironment != null) {
|
||||
nodeEnvironment = oldState.nodeEnvironment;
|
||||
final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry());
|
||||
final MetaData updatedMetaData = adaptGlobalMetaData.apply(oldState.getLastAcceptedState().metaData());
|
||||
if (updatedMetaData != oldState.getLastAcceptedState().metaData()) {
|
||||
metaStateService.writeGlobalStateAndUpdateManifest("update global state", updatedMetaData);
|
||||
}
|
||||
final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm());
|
||||
if (updatedTerm != oldState.getCurrentTerm()) {
|
||||
final Manifest manifest = metaStateService.loadManifestOrEmpty();
|
||||
metaStateService.writeManifestAndCleanup("update term",
|
||||
new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(),
|
||||
manifest.getIndexGenerations()));
|
||||
}
|
||||
delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode)
|
||||
.getPersistedState(Settings.EMPTY, null);
|
||||
} else {
|
||||
nodeEnvironment = null;
|
||||
BytesStreamOutput outStream = new BytesStreamOutput();
|
||||
outStream.setVersion(Version.CURRENT);
|
||||
oldState.getLastAcceptedState().writeTo(outStream);
|
||||
final MetaData updatedMetaData = adaptGlobalMetaData.apply(oldState.getLastAcceptedState().metaData());
|
||||
final ClusterState clusterState;
|
||||
if (updatedMetaData != oldState.getLastAcceptedState().metaData()) {
|
||||
clusterState = ClusterState.builder(oldState.getLastAcceptedState()).metaData(updatedMetaData).build();
|
||||
} else {
|
||||
clusterState = oldState.getLastAcceptedState();
|
||||
}
|
||||
clusterState.writeTo(outStream);
|
||||
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
|
||||
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
|
||||
delegate = new InMemoryPersistedState(oldState.getCurrentTerm(), ClusterState.readFrom(inStream,
|
||||
newLocalNode)); // adapts it to new localNode instance
|
||||
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
|
||||
ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("Unable to create MockPersistedState", e);
|
||||
|
@ -1614,12 +1694,17 @@ public class CoordinatorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
ClusterNode restartedNode() {
|
||||
return restartedNode(Function.identity(), Function.identity());
|
||||
}
|
||||
|
||||
ClusterNode restartedNode(Function<MetaData, MetaData> adaptGlobalMetaData, Function<Long, Long> adaptCurrentTerm) {
|
||||
final TransportAddress address = randomBoolean() ? buildNewFakeTransportAddress() : localNode.getAddress();
|
||||
final DiscoveryNode newLocalNode = new DiscoveryNode(localNode.getName(), localNode.getId(),
|
||||
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
|
||||
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
|
||||
localNode.isMasterNode() ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT);
|
||||
return new ClusterNode(nodeIndex, newLocalNode, node -> new MockPersistedState(newLocalNode, persistedState));
|
||||
return new ClusterNode(nodeIndex, newLocalNode,
|
||||
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetaData, adaptCurrentTerm));
|
||||
}
|
||||
|
||||
private PersistedState getPersistedState() {
|
||||
|
|
|
@ -43,7 +43,7 @@ public class JoinHelperTests extends ESTestCase {
|
|||
TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY,
|
||||
deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR,
|
||||
x -> localNode, null, Collections.emptySet());
|
||||
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L,
|
||||
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
|
||||
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
|
||||
Collections.emptyList());
|
||||
transportService.start();
|
||||
|
|
|
@ -411,6 +411,43 @@ public class MetaDataTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testXContentClusterUUID() throws IOException {
|
||||
final MetaData originalMeta = MetaData.builder().clusterUUID(UUIDs.randomBase64UUID())
|
||||
.clusterUUIDCommitted(randomBoolean()).build();
|
||||
final XContentBuilder builder = JsonXContent.contentBuilder();
|
||||
builder.startObject();
|
||||
originalMeta.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
|
||||
final MetaData fromXContentMeta = MetaData.fromXContent(parser);
|
||||
assertThat(fromXContentMeta.clusterUUID(), equalTo(originalMeta.clusterUUID()));
|
||||
assertThat(fromXContentMeta.clusterUUIDCommitted(), equalTo(originalMeta.clusterUUIDCommitted()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerializationClusterUUID() throws IOException {
|
||||
final MetaData originalMeta = MetaData.builder().clusterUUID(UUIDs.randomBase64UUID())
|
||||
.clusterUUIDCommitted(randomBoolean()).build();
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
originalMeta.writeTo(out);
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
|
||||
final MetaData fromStreamMeta = MetaData.readFrom(
|
||||
new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)
|
||||
);
|
||||
assertThat(fromStreamMeta.clusterUUID(), equalTo(originalMeta.clusterUUID()));
|
||||
assertThat(fromStreamMeta.clusterUUIDCommitted(), equalTo(originalMeta.clusterUUIDCommitted()));
|
||||
}
|
||||
|
||||
public void testMetaDataGlobalStateChangesOnClusterUUIDChanges() {
|
||||
final MetaData metaData1 = MetaData.builder().clusterUUID(UUIDs.randomBase64UUID()).clusterUUIDCommitted(randomBoolean()).build();
|
||||
final MetaData metaData2 = MetaData.builder(metaData1).clusterUUID(UUIDs.randomBase64UUID()).build();
|
||||
final MetaData metaData3 = MetaData.builder(metaData1).clusterUUIDCommitted(!metaData1.clusterUUIDCommitted()).build();
|
||||
assertFalse(MetaData.isGlobalStateEquals(metaData1, metaData2));
|
||||
assertFalse(MetaData.isGlobalStateEquals(metaData1, metaData3));
|
||||
final MetaData metaData4 = MetaData.builder(metaData2).clusterUUID(metaData1.clusterUUID()).build();
|
||||
assertTrue(MetaData.isGlobalStateEquals(metaData1, metaData4));
|
||||
}
|
||||
|
||||
private static CoordinationMetaData.VotingConfiguration randomVotingConfig() {
|
||||
return new CoordinationMetaData.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(randomInt(10), 20, false)));
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -377,6 +378,33 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
|
|||
assertTrue(client().prepareGet("index", "_doc", "1").get().isExists());
|
||||
}
|
||||
|
||||
public void testCannotJoinIfMasterLostDataFolder() throws Exception {
|
||||
String masterNode = internalCluster().startMasterOnlyNode();
|
||||
String dataNode = internalCluster().startDataOnlyNode();
|
||||
|
||||
internalCluster().restartNode(masterNode, new InternalTestCluster.RestartCallback() {
|
||||
@Override
|
||||
public boolean clearData(String nodeName) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings onNodeStopped(String nodeName) {
|
||||
return Settings.builder().put(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(), nodeName).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean validateClusterForming() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
assertFalse(internalCluster().client(masterNode).admin().cluster().prepareHealth().get().isTimedOut());
|
||||
assertTrue(internalCluster().client(masterNode).admin().cluster().prepareHealth().setWaitForNodes("2").setTimeout("2s").get()
|
||||
.isTimedOut());
|
||||
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNode)); // otherwise we will fail during clean-up
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that indices are properly deleted even if there is a master transition in between.
|
||||
* Test for https://github.com/elastic/elasticsearch/issues/11665
|
||||
|
|
|
@ -263,12 +263,12 @@ public class ClusterStateUpdatersTests extends ESTestCase {
|
|||
.blocks(ClusterBlocks.builder().addGlobalBlock(CLUSTER_READ_ONLY_BLOCK).build())
|
||||
.metaData(metaData)
|
||||
.build();
|
||||
assertThat(recoveredState.metaData().clusterUUID(), equalTo("_na_"));
|
||||
assertThat(recoveredState.metaData().clusterUUID(), equalTo(MetaData.UNKNOWN_CLUSTER_UUID));
|
||||
|
||||
final ClusterState updatedState = mixCurrentStateAndRecoveredState(currentState, recoveredState);
|
||||
|
||||
assertThat(updatedState.metaData().clusterUUID(), not(equalTo("_na_")));
|
||||
assertTrue(MetaData.isGlobalStateEquals(metaData, updatedState.metaData()));
|
||||
assertThat(updatedState.metaData().clusterUUID(), not(equalTo(MetaData.UNKNOWN_CLUSTER_UUID)));
|
||||
assertFalse(MetaData.isGlobalStateEquals(metaData, updatedState.metaData()));
|
||||
assertThat(updatedState.metaData().index("test"), equalTo(indexMetaData));
|
||||
assertTrue(updatedState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK));
|
||||
assertTrue(updatedState.blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
|
||||
|
|
|
@ -213,22 +213,24 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
|
|||
} while (coordinationMetaData.getLastAcceptedConfiguration().equals(coordinationMetaData.getLastCommittedConfiguration()));
|
||||
|
||||
ClusterState state = createClusterState(randomNonNegativeLong(),
|
||||
MetaData.builder().coordinationMetaData(coordinationMetaData).build());
|
||||
MetaData.builder().coordinationMetaData(coordinationMetaData)
|
||||
.clusterUUID(randomAlphaOfLength(10)).build());
|
||||
gateway.setLastAcceptedState(state);
|
||||
|
||||
gateway = maybeNew(gateway);
|
||||
assertThat(gateway.getLastAcceptedState().getLastAcceptedConfiguration(),
|
||||
not(equalTo(gateway.getLastAcceptedState().getLastCommittedConfiguration())));
|
||||
gateway.markLastAcceptedConfigAsCommitted();
|
||||
gateway.markLastAcceptedStateAsCommitted();
|
||||
|
||||
CoordinationMetaData expectedCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData)
|
||||
.lastCommittedConfiguration(coordinationMetaData.getLastAcceptedConfiguration()).build();
|
||||
ClusterState expectedClusterState =
|
||||
ClusterState.builder(state).metaData(MetaData.builder().coordinationMetaData(expectedCoordinationMetaData).build()).build();
|
||||
ClusterState.builder(state).metaData(MetaData.builder().coordinationMetaData(expectedCoordinationMetaData)
|
||||
.clusterUUID(state.metaData().clusterUUID()).clusterUUIDCommitted(true).build()).build();
|
||||
|
||||
gateway = maybeNew(gateway);
|
||||
assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState());
|
||||
gateway.markLastAcceptedConfigAsCommitted();
|
||||
gateway.markLastAcceptedStateAsCommitted();
|
||||
|
||||
gateway = maybeNew(gateway);
|
||||
assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState());
|
||||
|
|
Loading…
Reference in New Issue