Remove mutable status field from cluster state (#21379)

The ClusterState class currently has a mutable volatile field "status" that is only used by the ClusterStateObserver to differentiate between a cluster state that is being applied or one that has already been applied. This commit removes the field from cluster state, making it a truly immutable class. This information is stored instead by ClusterService, which is the only place that should update this field (PublishClusterStateAction was also updating it, but that information was never used anywhere). A new class is introduced (ClusterServiceState) which emcompasses the current cluster state as well as the current status, which is only used by the ClusterStateObserver mechanism.
This commit is contained in:
Yannick Welsch 2016-11-08 14:37:09 +01:00 committed by GitHub
parent 6f6c633298
commit 14a0e8ee57
19 changed files with 185 additions and 132 deletions

View File

@ -34,6 +34,8 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -140,7 +142,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
assert waitFor >= 0;
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
final ClusterState state = observer.observedState();
final ClusterServiceState observedState = observer.observedState();
final ClusterState state = observedState.getClusterState();
if (request.timeout().millis() == 0) {
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));
return;
@ -148,8 +151,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
final int concreteWaitFor = waitFor;
final ClusterStateObserver.ChangePredicate validationPredicate = new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
return newState.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, newState, concreteWaitFor);
protected boolean validate(ClusterServiceState newState) {
return newState.getClusterStateStatus() == ClusterStateStatus.APPLIED && validateRequest(request, newState.getClusterState(), concreteWaitFor);
}
};
@ -171,7 +174,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
listener.onResponse(response);
}
};
if (state.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, state, concreteWaitFor)) {
if (observedState.getClusterStateStatus() == ClusterStateStatus.APPLIED && validateRequest(request, state, concreteWaitFor)) {
stateListener.onNewClusterState(state);
} else {
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -69,14 +70,14 @@ public class ActiveShardsObserver extends AbstractComponent {
}
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
if (activeShardCount.enoughShardsActive(observer.observedState(), indexName)) {
if (activeShardCount.enoughShardsActive(observer.observedState().getClusterState(), indexName)) {
onResult.accept(true);
} else {
final ClusterStateObserver.ChangePredicate shardsAllocatedPredicate =
new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(final ClusterState newState) {
return activeShardCount.enoughShardsActive(newState, indexName);
protected boolean validate(final ClusterServiceState newState) {
return activeShardCount.enoughShardsActive(newState.getClusterState(), indexName);
}
};

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
@ -112,8 +113,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
private final ClusterStateObserver.ChangePredicate retryableOrNoBlockPredicate = new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
protected boolean validate(ClusterServiceState newState) {
ClusterBlockException blockException = checkBlock(request, newState.getClusterState());
return (blockException == null || !blockException.retryable());
}
};
@ -133,7 +134,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
}
protected void doStart() {
final ClusterState clusterState = observer.observedState();
final ClusterState clusterState = observer.observedState().getClusterState();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally

View File

@ -622,7 +622,7 @@ public abstract class TransportReplicationAction<
@Override
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.observedState();
final ClusterState state = observer.observedState().getClusterState();
if (handleBlockExceptions(state)) {
return;
}

View File

@ -124,9 +124,10 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
}
protected void doStart() {
nodes = observer.observedState().nodes();
final ClusterState clusterState = observer.observedState().getClusterState();
nodes = clusterState.nodes();
try {
ClusterBlockException blockException = checkGlobalBlock(observer.observedState());
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
if (blockException.retryable()) {
retry(blockException);
@ -135,9 +136,9 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
throw blockException;
}
}
request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(observer.observedState(), request).getName());
resolveRequest(observer.observedState(), request);
blockException = checkRequestBlock(observer.observedState(), request);
request.concreteIndex(indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName());
resolveRequest(clusterState, request);
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
if (blockException.retryable()) {
retry(blockException);
@ -146,7 +147,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
throw blockException;
}
}
shardIt = shards(observer.observedState(), request);
shardIt = shards(clusterState, request);
} catch (Exception e) {
listener.onFailure(e);
return;

View File

@ -64,10 +64,9 @@ import java.util.Set;
/**
* Represents the current state of the cluster.
* <p>
* The cluster state object is immutable with an
* exception of the {@link RoutingNodes} structure, which is built on demand from the {@link RoutingTable},
* and cluster state {@link #status}, which is updated during cluster state publishing and applying
* processing. The cluster state can be updated only on the master node. All updates are performed by on a
* The cluster state object is immutable with an exception of the {@link RoutingNodes} structure, which is
* built on demand from the {@link RoutingTable}.
* The cluster state can be updated only on the master node. All updates are performed by on a
* single thread and controlled by the {@link ClusterService}. After every update the
* {@link Discovery#publish} method publishes new version of the cluster state to all other nodes in the
* cluster. The actual publishing mechanism is delegated to the {@link Discovery#publish} method and depends on
@ -89,23 +88,6 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
public static final ClusterState PROTO = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
public static enum ClusterStateStatus {
UNKNOWN((byte) 0),
RECEIVED((byte) 1),
BEING_APPLIED((byte) 2),
APPLIED((byte) 3);
private final byte id;
ClusterStateStatus(byte id) {
this.id = id;
}
public byte id() {
return this.id;
}
}
public interface Custom extends Diffable<Custom>, ToXContent {
String type();
@ -166,8 +148,6 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
// built on demand
private volatile RoutingNodes routingNodes;
private volatile ClusterStateStatus status;
public ClusterState(long version, String stateUUID, ClusterState state) {
this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs(), false);
}
@ -181,19 +161,9 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
this.nodes = nodes;
this.blocks = blocks;
this.customs = customs;
this.status = ClusterStateStatus.UNKNOWN;
this.wasReadFromDiff = wasReadFromDiff;
}
public ClusterStateStatus status() {
return status;
}
public ClusterState status(ClusterStateStatus newStatus) {
this.status = newStatus;
return this;
}
public long version() {
return this.version;
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.cluster;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -50,7 +52,7 @@ public class ClusterStateObserver {
volatile TimeValue timeOutValue;
final AtomicReference<ObservedState> lastObservedState;
final AtomicReference<ClusterServiceState> lastObservedState;
final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
// observingContext is not null when waiting on cluster state changes
final AtomicReference<ObservingContext> observingContext = new AtomicReference<>(null);
@ -69,7 +71,7 @@ public class ClusterStateObserver {
*/
public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, Logger logger, ThreadContext contextHolder) {
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.lastObservedState = new AtomicReference<>(clusterService.clusterServiceState());
this.timeOutValue = timeout;
if (timeOutValue != null) {
this.startTimeNS = System.nanoTime();
@ -78,11 +80,11 @@ public class ClusterStateObserver {
this.contextHolder = contextHolder;
}
/** last cluster state observer by this observer. Note that this may not be the current one */
public ClusterState observedState() {
ObservedState state = lastObservedState.get();
/** last cluster state and status observed by this observer. Note that this may not be the current one */
public ClusterServiceState observedState() {
ClusterServiceState state = lastObservedState.get();
assert state != null;
return state.clusterState;
return state;
}
/** indicates whether this observer has timedout */
@ -126,7 +128,7 @@ public class ClusterStateObserver {
logger.trace("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry
timedOut = true;
lastObservedState.set(new ObservedState(clusterService.state()));
lastObservedState.set(clusterService.clusterServiceState());
listener.onTimeout(timeOutValue);
return;
}
@ -141,13 +143,13 @@ public class ClusterStateObserver {
}
// sample a new state
ObservedState newState = new ObservedState(clusterService.state());
ObservedState lastState = lastObservedState.get();
if (changePredicate.apply(lastState.clusterState, lastState.status, newState.clusterState, newState.status)) {
ClusterServiceState newState = clusterService.clusterServiceState();
ClusterServiceState lastState = lastObservedState.get();
if (changePredicate.apply(lastState, newState)) {
// good enough, let's go.
logger.trace("observer: sampled state accepted by predicate ({})", newState);
lastObservedState.set(newState);
listener.onNewClusterState(newState.clusterState);
listener.onNewClusterState(newState.getClusterState());
} else {
logger.trace("observer: sampled state rejected by predicate ({}). adding listener to ClusterService", newState);
ObservingContext context = new ObservingContext(new ContextPreservingListener(listener, contextHolder.newStoredContext()), changePredicate);
@ -161,11 +163,11 @@ public class ClusterStateObserver {
/**
* reset this observer to the give cluster state. Any pending waits will be canceled.
*/
public void reset(ClusterState toState) {
public void reset(ClusterServiceState state) {
if (observingContext.getAndSet(null) != null) {
clusterService.remove(clusterStateListener);
}
lastObservedState.set(new ObservedState(toState));
lastObservedState.set(state);
}
class ObserverClusterStateListener implements TimeoutClusterStateListener {
@ -180,10 +182,10 @@ public class ClusterStateObserver {
if (context.changePredicate.apply(event)) {
if (observingContext.compareAndSet(context, null)) {
clusterService.remove(this);
ObservedState state = new ObservedState(event.state());
ClusterServiceState state = new ClusterServiceState(event.state(), ClusterStateStatus.APPLIED);
logger.trace("observer: accepting cluster state change ({})", state);
lastObservedState.set(state);
context.listener.onNewClusterState(state.clusterState);
context.listener.onNewClusterState(state.getClusterState());
} else {
logger.trace("observer: predicate approved change but observing context has changed - ignoring (new cluster state version [{}])", event.state().version());
}
@ -199,15 +201,15 @@ public class ClusterStateObserver {
// No need to remove listener as it is the responsibility of the thread that set observingContext to null
return;
}
ObservedState newState = new ObservedState(clusterService.state());
ObservedState lastState = lastObservedState.get();
if (context.changePredicate.apply(lastState.clusterState, lastState.status, newState.clusterState, newState.status)) {
ClusterServiceState newState = clusterService.clusterServiceState();
ClusterServiceState lastState = lastObservedState.get();
if (context.changePredicate.apply(lastState, newState)) {
// double check we're still listening
if (observingContext.compareAndSet(context, null)) {
logger.trace("observer: post adding listener: accepting current cluster state ({})", newState);
clusterService.remove(this);
lastObservedState.set(newState);
context.listener.onNewClusterState(newState.clusterState);
context.listener.onNewClusterState(newState.getClusterState());
} else {
logger.trace("observer: postAdded - predicate approved state but observing context has changed - ignoring ({})", newState);
}
@ -235,7 +237,7 @@ public class ClusterStateObserver {
long timeSinceStartMS = TimeValue.nsecToMSec(System.nanoTime() - startTimeNS);
logger.trace("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStartMS));
// update to latest, in case people want to retry
lastObservedState.set(new ObservedState(clusterService.state()));
lastObservedState.set(clusterService.clusterServiceState());
timedOut = true;
context.listener.onTimeout(timeOutValue);
}
@ -260,10 +262,8 @@ public class ClusterStateObserver {
*
* @return true if newState should be accepted
*/
boolean apply(ClusterState previousState,
ClusterState.ClusterStateStatus previousStatus,
ClusterState newState,
ClusterState.ClusterStateStatus newStatus);
boolean apply(ClusterServiceState previousState,
ClusterServiceState newState);
/**
* called to see whether a cluster change should be accepted
@ -277,22 +277,25 @@ public class ClusterStateObserver {
public abstract static class ValidationPredicate implements ChangePredicate {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return (previousState != newState || previousStatus != newStatus) && validate(newState);
public boolean apply(ClusterServiceState previousState, ClusterServiceState newState) {
return (previousState.getClusterState() != newState.getClusterState() ||
previousState.getClusterStateStatus() != newState.getClusterStateStatus()) &&
validate(newState);
}
protected abstract boolean validate(ClusterState newState);
protected abstract boolean validate(ClusterServiceState newState);
@Override
public boolean apply(ClusterChangedEvent changedEvent) {
return changedEvent.previousState().version() != changedEvent.state().version() && validate(changedEvent.state());
return changedEvent.previousState().version() != changedEvent.state().version() &&
validate(new ClusterServiceState(changedEvent.state(), ClusterStateStatus.APPLIED));
}
}
public abstract static class EventPredicate implements ChangePredicate {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return previousState != newState || previousStatus != newStatus;
public boolean apply(ClusterServiceState previousState, ClusterServiceState newState) {
return previousState.getClusterState() != newState.getClusterState() || previousState.getClusterStateStatus() != newState.getClusterStateStatus();
}
}
@ -307,21 +310,6 @@ public class ClusterStateObserver {
}
}
static class ObservedState {
public final ClusterState clusterState;
public final ClusterState.ClusterStateStatus status;
public ObservedState(ClusterState clusterState) {
this.clusterState = clusterState;
this.status = clusterState.status();
}
@Override
public String toString() {
return "version [" + clusterState.version() + "], status [" + status + "]";
}
}
private static final class ContextPreservingListener implements Listener {
private final Listener delegate;
private final ThreadContext.StoredContext tempContext;

View File

@ -19,18 +19,19 @@
package org.elasticsearch.cluster;
import org.elasticsearch.cluster.service.ClusterServiceState;
public enum MasterNodeChangePredicate implements ClusterStateObserver.ChangePredicate {
INSTANCE;
@Override
public boolean apply(
ClusterState previousState,
ClusterState.ClusterStateStatus previousStatus,
ClusterState newState,
ClusterState.ClusterStateStatus newStatus) {
ClusterServiceState previousState,
ClusterServiceState newState) {
// checking if the masterNodeId changed is insufficient as the
// same master node might get re-elected after a disruption
return newState.nodes().getMasterNodeId() != null && newState != previousState;
return newState.getClusterState().nodes().getMasterNodeId() != null &&
newState.getClusterState() != previousState.getClusterState();
}
@Override

View File

@ -92,7 +92,7 @@ public class ShardStateAction extends AbstractComponent {
}
private void sendShardAction(final String actionName, final ClusterStateObserver observer, final ShardEntry shardEntry, final Listener listener) {
DiscoveryNode masterNode = observer.observedState().nodes().getMasterNode();
DiscoveryNode masterNode = observer.observedState().getClusterState().nodes().getMasterNode();
if (masterNode == null) {
logger.warn("{} no master known for action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry);
waitForNewMasterAndRetry(actionName, observer, shardEntry, listener);

View File

@ -81,6 +81,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -122,7 +123,7 @@ public class ClusterService extends AbstractLifecycleComponent {
private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
private volatile ClusterState clusterState;
private final AtomicReference<ClusterServiceState> state = new AtomicReference<>();
private final ClusterBlocks.Builder initialBlocks;
@ -136,7 +137,7 @@ public class ClusterService extends AbstractLifecycleComponent {
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
// will be replaced on doStart.
this.clusterState = ClusterState.builder(clusterName).build();
this.state.set(new ClusterServiceState(ClusterState.builder(clusterName).build(), ClusterStateStatus.UNKNOWN));
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
@ -157,9 +158,12 @@ public class ClusterService extends AbstractLifecycleComponent {
}
public synchronized void setLocalNode(DiscoveryNode localNode) {
assert clusterState.nodes().getLocalNodeId() == null : "local node is already set";
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()).add(localNode).localNodeId(localNode.getId());
this.clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
assert state.get().getClusterState().nodes().getLocalNodeId() == null : "local node is already set";
this.state.getAndUpdate(css -> {
ClusterState clusterState = css.getClusterState();
DiscoveryNodes nodes = DiscoveryNodes.builder(clusterState.nodes()).add(localNode).localNodeId(localNode.getId()).build();
return new ClusterServiceState(ClusterState.builder(clusterState).nodes(nodes).build(), css.getClusterStateStatus());
});
}
public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
@ -197,13 +201,14 @@ public class ClusterService extends AbstractLifecycleComponent {
@Override
protected synchronized void doStart() {
Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting");
Objects.requireNonNull(clusterState.nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(state.get().getClusterState().nodes().getLocalNode(), "please set the local node before starting");
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
add(localNodeMasterListeners);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.state.getAndUpdate(css -> new ClusterServiceState(
ClusterState.builder(css.getClusterState()).blocks(initialBlocks).build(),
css.getClusterStateStatus()));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME),
threadPool.getThreadContext());
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
}
@Override
@ -235,7 +240,7 @@ public class ClusterService extends AbstractLifecycleComponent {
* The local node.
*/
public DiscoveryNode localNode() {
DiscoveryNode localNode = clusterState.getNodes().getLocalNode();
DiscoveryNode localNode = state.get().getClusterState().getNodes().getLocalNode();
if (localNode == null) {
throw new IllegalStateException("No local node found. Is the node started?");
}
@ -247,10 +252,17 @@ public class ClusterService extends AbstractLifecycleComponent {
}
/**
* The current state.
* The current cluster state.
*/
public ClusterState state() {
return this.clusterState;
return this.state.get().getClusterState();
}
/**
* The current cluster service state comprising cluster state and cluster state status.
*/
public ClusterServiceState clusterServiceState() {
return this.state.get();
}
/**
@ -308,7 +320,7 @@ public class ClusterService extends AbstractLifecycleComponent {
/**
* Adds a cluster state listener that will timeout after the provided timeout,
* and is executed after the clusterstate has been successfully applied ie. is
* in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED}
* in state {@link ClusterStateStatus#APPLIED}
* NOTE: a {@code null} timeout means that the listener will never be removed
* automatically
*/
@ -542,7 +554,7 @@ public class ClusterService extends AbstractLifecycleComponent {
return;
}
logger.debug("processing [{}]: execute", tasksSummary);
ClusterState previousClusterState = clusterState;
ClusterState previousClusterState = state.get().getClusterState();
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && executor.runOnlyOnMaster()) {
logger.debug("failing [{}]: local node is no longer master", tasksSummary);
toExecute.stream().forEach(task -> task.listener.onNoLongerMaster(task.source));
@ -653,8 +665,6 @@ public class ClusterService extends AbstractLifecycleComponent {
}
final Discovery.AckListener ackListener = new DelegetingAckListener(ackListeners);
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", tasksSummary, newClusterState);
} else if (logger.isDebugEnabled()) {
@ -694,7 +704,7 @@ public class ClusterService extends AbstractLifecycleComponent {
}
// update the current cluster state
clusterState = newClusterState;
state.set(new ClusterServiceState(newClusterState, ClusterStateStatus.BEING_APPLIED));
logger.debug("set local cluster state to version {}", newClusterState.version());
try {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
@ -715,7 +725,7 @@ public class ClusterService extends AbstractLifecycleComponent {
nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes());
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
state.getAndUpdate(css -> new ClusterServiceState(css.getClusterState(), ClusterStateStatus.APPLIED));
for (ClusterStateListener listener : postAppliedListeners) {
try {

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
import org.elasticsearch.cluster.ClusterState;
/**
* A simple immutable container class that comprises a cluster state and cluster state status. Used by {@link ClusterService}
* to provide a snapshot view on which cluster state is currently being applied / already applied.
*/
public class ClusterServiceState {
private final ClusterState clusterState;
private final ClusterStateStatus clusterStateStatus;
public ClusterServiceState(ClusterState clusterState, ClusterStateStatus clusterStateStatus) {
this.clusterState = clusterState;
this.clusterStateStatus = clusterStateStatus;
}
public ClusterState getClusterState() {
return clusterState;
}
public ClusterStateStatus getClusterStateStatus() {
return clusterStateStatus;
}
@Override
public String toString() {
return "version [" + clusterState.version() + "], status [" + clusterStateStatus + "]";
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;
public enum ClusterStateStatus {
UNKNOWN,
BEING_APPLIED,
APPLIED;
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.Compressor;
@ -397,7 +398,6 @@ public class PublishClusterStateAction extends AbstractComponent {
pendingStatesQueue.addPending(incomingState);
lastSeenClusterState = incomingState;
lastSeenClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -399,7 +400,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
private void waitForClusterState(long clusterStateVersion) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, TimeValue.timeValueMinutes(5), logger,
threadPool.getThreadContext());
final ClusterState clusterState = observer.observedState();
final ClusterState clusterState = observer.observedState().getClusterState();
if (clusterState.getVersion() >= clusterStateVersion) {
logger.trace("node has cluster state with version higher than {} (current: {})", clusterStateVersion,
clusterState.getVersion());
@ -426,20 +427,20 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
return newState.getVersion() >= clusterStateVersion;
protected boolean validate(ClusterServiceState newState) {
return newState.getClusterState().getVersion() >= clusterStateVersion;
}
});
try {
future.get();
logger.trace("successfully waited for cluster state with version {} (current: {})", clusterStateVersion,
observer.observedState().getVersion());
observer.observedState().getClusterState().getVersion());
} catch (Exception e) {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"failed waiting for cluster state with version {} (current: {})",
clusterStateVersion,
observer.observedState()),
observer.observedState().getClusterState().getVersion()),
e);
throw ExceptionsHelper.convertToRuntime(e);
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -334,7 +335,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
protected boolean validate(ClusterServiceState newState) {
// the shard is not there in which case we want to send back a false (shard is not active), so the cluster state listener must be notified
// or the shard is active in which case we want to send back that the shard is active
// here we could also evaluate the cluster state and get the information from there. we

View File

@ -590,7 +590,7 @@ public class Node implements Closeable {
if (DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings).millis() > 0) {
final ThreadPool thread = injector.getInstance(ThreadPool.class);
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, thread.getThreadContext());
if (observer.observedState().nodes().getMasterNodeId() == null) {
if (observer.observedState().getClusterState().nodes().getMasterNodeId() == null) {
final CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override

View File

@ -23,11 +23,11 @@ import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESBackcompatTestCase;
import org.elasticsearch.transport.MockTransportClient;
@ -50,7 +50,6 @@ public class ClusterStateBackwardsCompatIT extends ESBackcompatTestCase {
tc.addTransportAddress(n.getNode().getAddress());
ClusterStateResponse response = tc.admin().cluster().prepareState().execute().actionGet();
assertThat(response.getState().status(), equalTo(ClusterState.ClusterStateStatus.UNKNOWN));
assertNotNull(response.getClusterName());
assertTrue(response.getState().getMetaData().hasIndex("test"));
}

View File

@ -535,7 +535,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
// there should not be any master as the minimum number of required eligible masters is not met
awaitBusy(() -> clusterService1.state().nodes().getMasterNode() == null &&
clusterService1.state().status() == ClusterState.ClusterStateStatus.APPLIED);
clusterService1.clusterServiceState().getClusterStateStatus() == ClusterStateStatus.APPLIED);
assertThat(testService1.master(), is(false));
// bring the node back up

View File

@ -43,6 +43,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
@ -1201,9 +1203,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// Don't restart the master node until we know the index deletion has taken effect on master and the master eligible node.
assertBusy(() -> {
for (String masterNode : allMasterEligibleNodes) {
final ClusterState masterState = internalCluster().clusterService(masterNode).state();
assertTrue("index not deleted on " + masterNode, masterState.metaData().hasIndex(idxName) == false &&
masterState.status() == ClusterState.ClusterStateStatus.APPLIED);
final ClusterServiceState masterState = internalCluster().clusterService(masterNode).clusterServiceState();
assertTrue("index not deleted on " + masterNode, masterState.getClusterState().metaData().hasIndex(idxName) == false &&
masterState.getClusterStateStatus() == ClusterStateStatus.APPLIED);
}
});
internalCluster().restartNode(masterNode1, InternalTestCluster.EMPTY_CALLBACK);