A new ClusterStateStatus to indicate cluster state life cycles

When the ClusterService applies a new cluster state, it is first assigned as the new active one and then all listeners are called. Some of ES's features sample the current state and try to take action on it (for example index a document). If that fails, they will wait for change in the cluster state and try again (for example, wait for a shard to start and try indexing again).

If you're unlucky you sample the state after it has been assigned as the "active" state but before all listeners has done the work. In this cases the action take (i.e., indexing a doc) will still fail (as the shard is not yet started) but waiting for a new state may take a long time or fail.

This commit adds a new ClusterStateStatus that allows to better track the stages a cluster state goes through (currently `RECEIVED`, `BEING_APPLIED` & `APPLIED`). This allows detecting that a cluster state is not yet fully applied and retry without waiting for a new state to arrive.

This commit also adds a utility class , ClusterStateObserver, to make this pattern slightly simpler and avoid common pit falls.

Closes #5741
This commit is contained in:
Boaz Leskes 2014-03-28 16:04:03 +01:00
parent 41cc1f5bcb
commit 1434f6bcbb
10 changed files with 548 additions and 228 deletions

View File

@ -26,7 +26,7 @@ import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
@ -91,11 +91,11 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
innerExecute(request, listener, false);
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
}
private void innerExecute(final Request request, final ActionListener<Response> listener, final boolean retrying) {
final ClusterState clusterState = clusterService.state();
private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
final ClusterState clusterState = observer.observedState();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
@ -105,37 +105,32 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
listener.onFailure(blockException);
return;
}
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
ClusterBlockException blockException = checkBlock(request, clusterService.state());
if (blockException == null || !blockException.retryable()) {
clusterService.remove(this);
innerExecute(request, listener, false);
logger.trace("can't execute due to a cluster block: [{}], retrying", blockException);
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(blockException);
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(blockException);
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
return (blockException == null || !blockException.retryable());
}
}
}
);
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(blockException);
}
@Override
public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(blockException);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterBlockException blockException = checkBlock(request, event.state());
if (blockException == null || !blockException.retryable()) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
});
} else {
try {
threadPool.executor(executor).execute(new Runnable() {
@ -158,38 +153,35 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
listener.onFailure(new MasterNotDiscoveredException());
} else {
logger.debug("no known master node, scheduling a retry");
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, true);
}
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
if (clusterStateV2.nodes().masterNodeId() != null) {
// now we have a master, try and execute it...
clusterService.remove(this);
innerExecute(request, listener, true);
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
}
}, new ClusterStateObserver.ChangePredicate() {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return newState.nodes().masterNodeId() != null;
}
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
}
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, true);
}
}
});
);
}
return;
}
@ -216,38 +208,28 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
nodes.masterNode(), exp.getDetailedMessage());
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
// checking for changes that happened while adding the listener. We can't check using cluster
// state versions as mater election doesn't increase version numbers
if (clusterState != clusterStateV2) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(new MasterNotDiscoveredException());
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
});
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException());
}
}, new ClusterStateObserver.EventPredicate() {
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
} else {
listener.onFailure(exp);
}

View File

@ -28,10 +28,9 @@ import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -317,11 +316,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private final ActionListener<Response> listener;
private final Request request;
private volatile ClusterState clusterState;
private volatile ShardIterator shardIt;
private final AtomicBoolean primaryOperationStarted = new AtomicBoolean();
private final ReplicationType replicationType;
protected final long startTime = System.currentTimeMillis();
private volatile ClusterStateObserver observer;
AsyncShardOperationAction(Request request, ActionListener<Response> listener) {
this.request = request;
@ -335,40 +333,40 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
public void start() {
start(false);
observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
doStart();
}
/**
* Returns <tt>true</tt> if the action starting to be performed on the primary (or is done).
*/
public boolean start(final boolean fromClusterEvent) throws ElasticsearchException {
this.clusterState = clusterService.state();
protected boolean doStart() throws ElasticsearchException {
try {
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
ClusterBlockException blockException = checkGlobalBlock(observer.observedState(), request);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(fromClusterEvent, blockException);
retry(blockException);
return false;
} else {
throw blockException;
}
}
// check if we need to execute, and if not, return
if (!resolveRequest(clusterState, request, listener)) {
if (!resolveRequest(observer.observedState(), request, listener)) {
return true;
}
blockException = checkRequestBlock(clusterState, request);
blockException = checkRequestBlock(observer.observedState(), request);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage());
retry(fromClusterEvent, blockException);
retry(blockException);
return false;
} else {
throw blockException;
}
}
shardIt = shards(clusterState, request);
shardIt = shards(observer.observedState(), request);
} catch (Throwable e) {
listener.onFailure(e);
return true;
@ -378,7 +376,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (shardIt.size() == 0) {
logger.trace("no shard instances known for shard [{}], scheduling a retry", shardIt.shardId());
retry(fromClusterEvent, null);
retry(null);
return false;
}
@ -390,9 +388,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (!shard.primary()) {
continue;
}
if (!shard.active() || !clusterState.nodes().nodeExists(shard.currentNodeId())) {
if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) {
logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId());
retry(fromClusterEvent, null);
retry(null);
return false;
}
@ -413,7 +411,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (shardIt.sizeActive() < requiredNumber) {
logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry.",
shard.shardId(), consistencyLevel, shardIt.sizeActive(), requiredNumber);
retry(fromClusterEvent, null);
retry(null);
return false;
}
}
@ -423,7 +421,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
foundPrimary = true;
if (shard.currentNodeId().equals(clusterState.nodes().localNodeId())) {
if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
if (request.operationThreaded()) {
request.beforeLocalFork();
@ -431,20 +429,20 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void run() {
try {
performOnPrimary(shard.id(), shard, clusterState);
performOnPrimary(shard.id(), shard, observer.observedState());
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
} else {
performOnPrimary(shard.id(), shard, clusterState);
performOnPrimary(shard.id(), shard, observer.observedState());
}
} catch (Throwable t) {
listener.onFailure(t);
}
} else {
DiscoveryNode node = clusterState.nodes().get(shard.currentNodeId());
DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId());
transportService.sendRequest(node, transportAction, request, transportOptions, new BaseTransportResponseHandler<Response>() {
@Override
@ -471,7 +469,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
retry(false, null);
retry(null);
} else {
listener.onFailure(exp);
}
@ -483,67 +481,38 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// we won't find a primary if there are no shards in the shard iterator, retry...
if (!foundPrimary) {
logger.trace("couldn't find a eligible primary shard, scheduling for retry.");
retry(fromClusterEvent, null);
retry(null);
return false;
}
return true;
}
void retry(boolean fromClusterEvent, @Nullable final Throwable failure) {
if (fromClusterEvent) {
logger.trace("retry scheduling ignored as it as we already have a listener in place");
void retry(@Nullable final Throwable failure) {
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
return;
}
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
request.operationThreaded(true);
TimeValue timeout = new TimeValue(request.timeout().millis() - (System.currentTimeMillis() - startTime));
if (timeout.millis() <= 0) {
raiseTimeoutFailure(timeout, failure);
return;
}
clusterService.add(timeout, new TimeoutClusterStateListener() {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void postAdded() {
// check if state version changed while we were adding this listener
long sampledVersion = clusterState.version();
long currentVersion = clusterService.state().version();
if (sampledVersion != currentVersion) {
logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion);
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
public void onNewClusterState(ClusterState state) {
doStart();
}
@Override
public void onClose() {
clusterService.remove(this);
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
logger.trace("cluster changed (version {}), trying to start again", event.state().version());
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
@Override
public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
public void onTimeout(TimeValue timeout) {
if (doStart()) {
return;
}
clusterService.remove(this);
raiseTimeoutFailure(timeValue, failure);
raiseTimeoutFailure(timeout, failure);
}
});
}
@ -568,7 +537,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (retryPrimaryException(e)) {
primaryOperationStarted.set(false);
logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage());
retry(false, e);
retry(e);
return;
}
if (e instanceof ElasticsearchException && ((ElasticsearchException) e).status() == RestStatus.CONFLICT) {
@ -601,7 +570,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// new primary shard as well...
ClusterState newState = clusterService.state();
ShardRouting newPrimaryShard = null;
if (clusterState != newState) {
if (observer.observedState() != newState) {
shardIt.reset();
ShardRouting originalPrimaryShard = null;
while ((shard = shardIt.nextOrNull()) != null) {
@ -614,7 +583,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
throw new ElasticsearchIllegalStateException("unexpected state, failed to find primary shard on an index operation that succeeded");
}
clusterState = newState;
observer.reset(newState);
shardIt = shards(newState, request);
while ((shard = shardIt.nextOrNull()) != null) {
if (shard.primary()) {
@ -654,7 +623,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
AtomicInteger counter = new AtomicInteger(replicaCounter);
IndexMetaData indexMetaData = clusterState.metaData().index(request.index());
IndexMetaData indexMetaData = observer.observedState().metaData().index(request.index());
if (newPrimaryShard != null) {
performOnReplica(response, counter, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData);
@ -699,7 +668,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId, final IndexMetaData indexMetaData) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!clusterState.nodes().nodeExists(nodeId)) {
if (!observer.observedState().nodes().nodeExists(nodeId)) {
if (counter.decrementAndGet() == 0) {
listener.onResponse(response.response());
}
@ -707,8 +676,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(clusterState.nodes().localNodeId())) {
final DiscoveryNode node = clusterState.nodes().get(nodeId);
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {

View File

@ -24,10 +24,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -119,42 +118,44 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
private final AtomicBoolean operationStarted = new AtomicBoolean();
private volatile ClusterStateObserver observer;
private AsyncSingleAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
}
public void start() {
start(false);
observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
doStart();
}
public boolean start(final boolean fromClusterEvent) throws ElasticsearchException {
final ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
protected boolean doStart() throws ElasticsearchException {
nodes = observer.observedState().nodes();
try {
ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
ClusterBlockException blockException = checkGlobalBlock(observer.observedState(), request);
if (blockException != null) {
if (blockException.retryable()) {
retry(fromClusterEvent, blockException);
retry(blockException);
return false;
} else {
throw blockException;
}
}
// check if we need to execute, and if not, return
if (!resolveRequest(clusterState, request, listener)) {
if (!resolveRequest(observer.observedState(), request, listener)) {
return true;
}
blockException = checkRequestBlock(clusterState, request);
blockException = checkRequestBlock(observer.observedState(), request);
if (blockException != null) {
if (blockException.retryable()) {
retry(fromClusterEvent, blockException);
retry(blockException);
return false;
} else {
throw blockException;
}
}
shardIt = shards(clusterState, request);
shardIt = shards(observer.observedState(), request);
} catch (Throwable e) {
listener.onFailure(e);
return true;
@ -162,7 +163,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
if (shardIt.size() == 0) {
retry(fromClusterEvent, null);
retry(null);
return false;
}
@ -173,7 +174,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
assert shard != null;
if (!shard.active()) {
retry(fromClusterEvent, null);
retry(null);
return false;
}
@ -195,7 +196,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
operationStarted.set(false);
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
retry(false, null);
retry(null);
} else {
listener.onFailure(e);
}
@ -204,7 +205,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
});
} catch (Throwable e) {
if (retryOnFailure(e)) {
retry(fromClusterEvent, null);
retry(null);
} else {
listener.onFailure(e);
}
@ -236,7 +237,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
operationStarted.set(false);
// we already marked it as started when we executed it (removed the listener) so pass false
// to re-add to the cluster listener
retry(false, null);
retry(null);
} else {
listener.onFailure(exp);
}
@ -246,53 +247,41 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
return true;
}
void retry(final boolean fromClusterEvent, final @Nullable Throwable failure) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
void retry(final @Nullable Throwable failure) {
if (observer.isTimedOut()) {
// we running as a last attempt after a timeout has happened. don't retry
return;
}
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
}
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doStart();
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(nodes.localNode()));
}
@Override
public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
return;
}
clusterService.remove(this);
@Override
public void onTimeout(TimeValue timeout) {
// just to be on the safe side, see if we can start it now?
if (!doStart()) {
Throwable listenFailure = failure;
if (listenFailure == null) {
if (shardIt == null) {
listenFailure = new UnavailableShardsException(new ShardId(request.index(), -1), "Timeout waiting for [" + timeValue + "], request: " + request.toString());
listenFailure = new UnavailableShardsException(new ShardId(request.index(), -1), "Timeout waiting for [" + timeout + "], request: " + request.toString());
} else {
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + request.toString());
}
}
listener.onFailure(listenFailure);
}
});
}
}
}, request.timeout());
}
}

View File

@ -61,6 +61,23 @@ import java.util.Set;
*/
public class ClusterState implements ToXContent {
public static enum ClusterStateStatus {
UNKNOWN((byte) 0),
RECEIVED((byte) 1),
BEING_APPLIED((byte) 2),
APPLIED((byte) 3);
private final byte id;
ClusterStateStatus(byte id) {
this.id = id;
}
public byte id() {
return this.id;
}
}
public interface Custom {
interface Factory<T extends Custom> {
@ -117,6 +134,8 @@ public class ClusterState implements ToXContent {
private SettingsFilter settingsFilter;
private volatile ClusterStateStatus status;
public ClusterState(long version, ClusterState state) {
this(state.clusterName, version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs());
}
@ -129,6 +148,16 @@ public class ClusterState implements ToXContent {
this.nodes = nodes;
this.blocks = blocks;
this.customs = customs;
this.status = ClusterStateStatus.UNKNOWN;
}
public ClusterStateStatus status() {
return status;
}
public ClusterState status(ClusterStateStatus newStatus) {
this.status = newStatus;
return this;
}
public long version() {

View File

@ -0,0 +1,327 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.atomic.AtomicReference;
/**
* A utility class which simplifies interacting with the cluster state in cases where
* one tries to take action based on the current state but may want to wait for a new state
* and retry upon failure.
*/
public class ClusterStateObserver {
protected final ESLogger logger;
public final ChangePredicate MATCH_ALL_CHANGES_PREDICATE = new EventPredicate() {
@Override
public boolean apply(ClusterChangedEvent changedEvent) {
return changedEvent.previousState().version() != changedEvent.state().version();
}
};
private ClusterService clusterService;
volatile TimeValue timeOutValue;
final AtomicReference<ObservedState> lastObservedState;
// observingContext is not null when waiting on cluster state changes
final AtomicReference<ObservingContext> observingContext = new AtomicReference<ObservingContext>(null);
volatile long startTime;
volatile boolean timedOut;
volatile TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener();
public ClusterStateObserver(ClusterService clusterService, ESLogger logger) {
this(clusterService, new TimeValue(60000), logger);
}
/**
* @param clusterService
* @param timeout a global timeout for this observer. After it has expired the observer
* will fail any existing or new #waitForNextChange calls.
*/
public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ESLogger logger) {
this.timeOutValue = timeout;
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.startTime = System.currentTimeMillis();
this.logger = logger;
}
/** last cluster state observer by this observer. Note that this may not be the current one */
public ClusterState observedState() {
ObservedState state = lastObservedState.get();
assert state != null;
return state.clusterState;
}
/** indicates whether this observer has timedout */
public boolean isTimedOut() {
return timedOut;
}
public void waitForNextChange(Listener listener) {
waitForNextChange(listener, MATCH_ALL_CHANGES_PREDICATE);
}
public void waitForNextChange(Listener listener, @Nullable TimeValue timeOutValue) {
waitForNextChange(listener, MATCH_ALL_CHANGES_PREDICATE, timeOutValue);
}
public void waitForNextChange(Listener listener, ChangePredicate changePredicate) {
waitForNextChange(listener, changePredicate, null);
}
/**
* Wait for the next cluster state which satisfies changePredicate
*
* @param listener callback listener
* @param changePredicate predicate to check whether cluster state changes are relevant and the callback should be called
* @param timeOutValue a timeout for waiting. If null the global observer timeout will be used.
*/
public void waitForNextChange(Listener listener, ChangePredicate changePredicate, @Nullable TimeValue timeOutValue) {
if (observingContext.get() != null) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
long timeoutTimeLeft;
if (timeOutValue == null) {
timeOutValue = this.timeOutValue;
long timeSinceStart = System.currentTimeMillis() - startTime;
timeoutTimeLeft = timeOutValue.millis() - timeSinceStart;
if (timeoutTimeLeft <= 0l) {
// things have timeout while we were busy -> notify
logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart));
// update to latest, in case people want to retry
timedOut = true;
lastObservedState.set(new ObservedState(clusterService.state()));
listener.onTimeout(timeOutValue);
return;
}
} else {
this.startTime = System.currentTimeMillis();
this.timeOutValue = timeOutValue;
timeoutTimeLeft = timeOutValue.millis();
timedOut = false;
}
// sample a new state
ObservedState newState = new ObservedState(clusterService.state());
ObservedState lastState = lastObservedState.get();
if (changePredicate.apply(lastState.clusterState, lastState.status, newState.clusterState, newState.status)) {
// good enough, let's go.
logger.trace("observer: sampled state accepted by predicate ({})", newState);
lastObservedState.set(newState);
listener.onNewClusterState(newState.clusterState);
} else {
logger.trace("observer: sampled state rejected by predicate ({}). adding listener to ClusterService", newState);
ObservingContext context = new ObservingContext(listener, changePredicate);
if (!observingContext.compareAndSet(null, context)) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
clusterService.add(new TimeValue(timeoutTimeLeft), clusterStateListener);
}
}
public void close() {
if (observingContext.getAndSet(null) != null) {
clusterService.remove(clusterStateListener);
logger.trace("cluster state observer closed");
}
}
/**
* reset this observer to the give cluster state. Any pending waits will be canceled.
*
* @param toState
*/
public void reset(ClusterState toState) {
if (observingContext.getAndSet(null) != null) {
clusterService.remove(clusterStateListener);
}
lastObservedState.set(new ObservedState(toState));
}
class ObserverClusterStateListener implements TimeoutClusterStateListener {
@Override
public void clusterChanged(ClusterChangedEvent event) {
ObservingContext context = observingContext.get();
if (context == null) {
// No need to remove listener as it is the responsibility of the thread that set observingContext to null
return;
}
if (context.changePredicate.apply(event)) {
if (observingContext.compareAndSet(context, null)) {
clusterService.remove(this);
ObservedState state = new ObservedState(event.state());
logger.trace("observer: accepting cluster state change ({})", state);
lastObservedState.set(state);
context.listener.onNewClusterState(state.clusterState);
} else {
logger.trace("observer: predicate approved change but observing context has changed - ignoring (new cluster state version [{}])", event.state().version());
}
} else {
logger.trace("observer: predicate rejected change (new cluster state version [{}])", event.state().version());
}
}
@Override
public void postAdded() {
ObservingContext context = observingContext.get();
if (context == null) {
// No need to remove listener as it is the responsibility of the thread that set observingContext to null
return;
}
ObservedState newState = new ObservedState(clusterService.state());
ObservedState lastState = lastObservedState.get();
if (context.changePredicate.apply(lastState.clusterState, lastState.status, newState.clusterState, newState.status)) {
// double check we're still listening
if (observingContext.compareAndSet(context, null)) {
logger.trace("observer: post adding listener: accepting current cluster state ({})", newState);
clusterService.remove(this);
lastObservedState.set(newState);
context.listener.onNewClusterState(newState.clusterState);
} else {
logger.trace("observer: postAdded - predicate approved state but observing context has changed - ignoring ({})", newState);
}
} else {
logger.trace("observer: postAdded - predicate rejected state ({})", newState);
}
}
@Override
public void onClose() {
ObservingContext context = observingContext.getAndSet(null);
if (context != null) {
logger.trace("observer: cluster service closed. notifying listener.");
clusterService.remove(this);
context.listener.onClusterServiceClose();
}
}
@Override
public void onTimeout(TimeValue timeout) {
ObservingContext context = observingContext.getAndSet(null);
if (context != null) {
clusterService.remove(this);
long timeSinceStart = System.currentTimeMillis() - startTime;
logger.debug("observer: timeout notification from cluster service. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart));
// update to latest, in case people want to retry
lastObservedState.set(new ObservedState(clusterService.state()));
timedOut = true;
context.listener.onTimeout(timeOutValue);
}
}
}
public interface Listener {
/** called when a new state is observed */
void onNewClusterState(ClusterState state);
/** called when the cluster service is closed */
void onClusterServiceClose();
void onTimeout(TimeValue timeout);
}
public interface ChangePredicate {
/**
* a rough check used when starting to monitor for a new change. Called infrequently can be less accurate.
*
* @return true if newState should be accepted
*/
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus);
/**
* called to see whether a cluster change should be accepted
*
* @return true if changedEvent.state() should be accepted
*/
public boolean apply(ClusterChangedEvent changedEvent);
}
public static abstract class ValidationPredicate implements ChangePredicate {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
if (previousState != newState || previousStatus != newStatus) {
return validate(newState);
}
return false;
}
protected abstract boolean validate(ClusterState newState);
@Override
public boolean apply(ClusterChangedEvent changedEvent) {
if (changedEvent.previousState().version() != changedEvent.state().version()) {
return validate(changedEvent.state());
}
return false;
}
}
public static abstract class EventPredicate implements ChangePredicate {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return previousState != newState || previousStatus != newStatus;
}
}
static class ObservingContext {
final public Listener listener;
final public ChangePredicate changePredicate;
public ObservingContext(Listener listener, ChangePredicate changePredicate) {
this.listener = listener;
this.changePredicate = changePredicate;
}
}
static class ObservedState {
final public ClusterState clusterState;
final public ClusterState.ClusterStateStatus status;
public ObservedState(ClusterState clusterState) {
this.clusterState = clusterState;
this.status = clusterState.status();
}
@Override
public String toString() {
return "version [" + clusterState.version() + "], status [" + status + "]";
}
}
}

View File

@ -376,6 +376,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
}
}
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
@ -442,6 +444,8 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
});
}
newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);
//manual ack only from the master at the end of the publish
if (newClusterState.nodes().localNodeMaster()) {
try {

View File

@ -302,6 +302,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
continue;
}
final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode);
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() {

View File

@ -170,7 +170,7 @@ public class PublishClusterStateAction extends AbstractComponent {
}
in.setVersion(request.version());
ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
clusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
logger.debug("received cluster state version {}", clusterState.version());
listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {
@Override

View File

@ -29,6 +29,7 @@ import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.util.HashMap;
@ -43,6 +44,7 @@ import static org.hamcrest.Matchers.greaterThan;
public class NoMasterNodeTests extends ElasticsearchIntegrationTest {
@Test
@TestLogging("action:TRACE,cluster.service:TRACE")
public void testNoMasterActions() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "zen")

View File

@ -64,6 +64,7 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
private int numRuns = -1;
@ -299,14 +300,14 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
@Slow
public void testConcurrentFacets() throws ElasticsearchException, IOException, InterruptedException, ExecutionException {
assertAcked(prepareCreate("test")
.addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties")
.startObject("byte").field("type", "byte").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("short").field("type", "short").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("integer").field("type", "integer").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("long").field("type", "long").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("float").field("type", "float").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("double").field("type", "double").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.endObject().endObject().endObject()));
.addMapping("type", jsonBuilder().startObject().startObject("type").startObject("properties")
.startObject("byte").field("type", "byte").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("short").field("type", "short").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("integer").field("type", "integer").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("long").field("type", "long").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("float").field("type", "float").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.startObject("double").field("type", "double").startObject("fielddata").field("format", maybeDocValues() ? "doc_values" : null).endObject().endObject()
.endObject().endObject().endObject()));
ensureGreen();
for (int i = 0; i < 100; i++) {
@ -327,7 +328,9 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
.endObject()).execute().actionGet();
}
logger.info("done indexing. issue a refresh.");
flushAndRefresh();
final AtomicInteger searchId = new AtomicInteger(0);
ConcurrentDuel<Facets> duel = new ConcurrentDuel<>(5);
{
final Client cl = client();
@ -362,7 +365,11 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
@Override
public Facets run() {
final SearchRequestBuilder facetRequest;
if (count.incrementAndGet() % 2 == 0) { // every second request is mapped
int searchId = count.incrementAndGet();
if (searchId % 100 == 0) {
logger.info("-> run {} searches", searchId);
}
if (searchId % 2 == 0) { // every second request is mapped
facetRequest = cl.prepareSearch().setQuery(matchAllQuery())
.addFacet(termsFacet("double").field("double").size(10))
.addFacet(termsFacet("float").field("float").size(10))
@ -389,6 +396,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
}, 5000
);
}
logger.info("starting second duel");
{
duel.duel(new ConcurrentDuel.DuelJudge<Facets>() {
@ -420,7 +428,12 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
@Override
public Facets run() {
final SearchRequestBuilder facetRequest;
switch (count.incrementAndGet() % 6) {
int searchId = count.incrementAndGet();
if (searchId % 100 == 0) {
logger.info("-> run {} searches", searchId);
}
switch (searchId % 6) {
case 4:
facetRequest = client().prepareSearch()
.setQuery(matchAllQuery())
@ -496,7 +509,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
.startObject("filtered")
.field("type", "string")
.startObject("fielddata").field("format", "fst").field("loading", randomBoolean() ? "eager" : "lazy").startObject("filter")
.startObject("regex").field("pattern", "\\d{1,2}").endObject().endObject()
.startObject("regex").field("pattern", "\\d{1,2}").endObject().endObject()
.endObject()
// only 1 or 2 digits
.endObject()
@ -525,6 +538,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
.endObject()).execute().actionGet();
}
logger.info("done indexing. refreshing.");
flushAndRefresh();
ConcurrentDuel<Facets> duel = new ConcurrentDuel<>(5);
String[] fieldPostFix = new String[]{"", "_mv"};
@ -567,6 +581,9 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
final SearchRequestBuilder facetRequest;
int incrementAndGet = count.incrementAndGet();
final String field;
if (incrementAndGet % 100 == 0) {
logger.info("-> run {} searches", incrementAndGet);
}
switch (incrementAndGet % 2) {
case 1:
field = "filtered" + postfix;
@ -612,7 +629,7 @@ public class SimpleFacetsTests extends ElasticsearchIntegrationTest {
SearchResponse actionGet = facetRequest.execute().actionGet();
return actionGet.getFacets();
}
}, 5000
}, 2000
);
}