Cluster State Update APIs (master node) to respect master_timeout better

Currently, the master node might be processing too many cluster state events, and then be blocked on waiting for its respective even to be processed. We can use the new cluster state update timeout support to use the master_timeout value and respect it.

closes #3365
This commit is contained in:
Shay Banon 2013-07-22 16:58:00 +02:00
parent 0b33394476
commit 235a68c3bd
41 changed files with 329 additions and 54 deletions

View File

@ -23,12 +23,14 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -76,7 +78,19 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
final AtomicReference<ClusterState> clusterStateResponse = new AtomicReference<ClusterState>();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
try {
@ -87,7 +101,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
return currentState;
}
return newState;
} catch (Exception e) {
} catch (Throwable e) {
logger.debug("failed to reroute", e);
failureRef.set(e);
latch.countDown();

View File

@ -24,9 +24,10 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
@ -35,9 +36,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -92,7 +91,19 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder();
final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder();
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
try {
@ -149,7 +160,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
}
return ClusterState.builder().state(currentState).metaData(metaData).blocks(blocks).build();
} catch (Exception e) {
} catch (Throwable e) {
latch.countDown();
logger.warn("failed to update cluster settings", e);
return currentState;

View File

@ -85,7 +85,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
final AtomicReference<IndicesAliasesResponse> responseRef = new AtomicReference<IndicesAliasesResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]), request.timeout()), new MetaDataIndexAliasesService.Listener() {
indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]), request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexAliasesService.Listener() {
@Override
public void onResponse(MetaDataIndexAliasesService.Response response) {
responseRef.set(new IndicesAliasesResponse(response.acknowledged()));

View File

@ -99,7 +99,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio
final AtomicReference<CloseIndexResponse> responseRef = new AtomicReference<CloseIndexResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
stateIndexService.closeIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()), new MetaDataStateIndexService.Listener() {
stateIndexService.closeIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataStateIndexService.Listener() {
@Override
public void onResponse(MetaDataStateIndexService.Response response) {
responseRef.set(new CloseIndexResponse(response.acknowledged()));

View File

@ -86,7 +86,8 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings())
.mappings(request.mappings())
.customs(request.customs())
.timeout(request.timeout()),
.timeout(request.timeout())
.masterTimeout(request.masterNodeTimeout()),
new MetaDataCreateIndexService.Listener() {
@Override
public void onResponse(MetaDataCreateIndexService.Response response) {

View File

@ -108,7 +108,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(request.indices().length);
for (final String index : request.indices()) {
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()), new MetaDataDeleteIndexService.Listener() {
deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() {
@Override
public void onResponse(MetaDataDeleteIndexService.Response response) {
responseRef.set(new DeleteIndexResponse(response.acknowledged()));

View File

@ -114,7 +114,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()), new MetaDataMappingService.Listener() {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
latch.countDown();
@ -130,7 +130,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc
@Override
public void onFailure(Throwable e) {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()), new MetaDataMappingService.Listener() {
metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
latch.countDown();

View File

@ -90,7 +90,7 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
final AtomicReference<PutMappingResponse> responseRef = new AtomicReference<PutMappingResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()), new MetaDataMappingService.Listener() {
metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() {
@Override
public void onResponse(MetaDataMappingService.Response response) {
responseRef.set(new PutMappingResponse(response.acknowledged()));

View File

@ -85,7 +85,7 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction
final AtomicReference<OpenIndexResponse> responseRef = new AtomicReference<OpenIndexResponse>();
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
stateIndexService.openIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()), new MetaDataStateIndexService.Listener() {
stateIndexService.openIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataStateIndexService.Listener() {
@Override
public void onResponse(MetaDataStateIndexService.Response response) {
responseRef.set(new OpenIndexResponse(response.acknowledged()));

View File

@ -71,7 +71,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
updateSettingsService.updateSettings(request.settings(), request.indices(), new MetaDataUpdateSettingsService.Listener() {
updateSettingsService.updateSettings(request.settings(), request.indices(), request.masterNodeTimeout(), new MetaDataUpdateSettingsService.Listener() {
@Override
public void onSuccess() {
latch.countDown();

View File

@ -79,7 +79,7 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeOpera
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
indexTemplateService.removeTemplates(new MetaDataIndexTemplateService.RemoveRequest(request.name()), new MetaDataIndexTemplateService.RemoveListener() {
indexTemplateService.removeTemplates(new MetaDataIndexTemplateService.RemoveRequest(request.name()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexTemplateService.RemoveListener() {
@Override
public void onResponse(MetaDataIndexTemplateService.RemoveResponse response) {
responseRef.set(new DeleteIndexTemplateResponse(response.acknowledged()));

View File

@ -89,7 +89,8 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeOperatio
.settings(request.settings())
.mappings(request.mappings())
.customs(request.customs())
.create(request.create()),
.create(request.create())
.masterTimeout(request.masterNodeTimeout()),
new MetaDataIndexTemplateService.PutListener() {
@Override

View File

@ -25,14 +25,16 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.warmer.IndexWarmerMissingException;
@ -91,7 +93,19 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
try {

View File

@ -25,14 +25,16 @@ import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
@ -101,7 +103,19 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onTimeout(TimeValue timeout, String source) {
failureRef.set(new ProcessClusterEventTimeoutException(timeout, source));
latch.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
MetaData metaData = currentState.metaData();

View File

@ -25,11 +25,11 @@ import org.elasticsearch.common.unit.TimeValue;
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
public interface TimeoutClusterStateUpdateTask extends ClusterStateUpdateTask {
public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask {
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onTimeout(String)}.
* {@link #onTimeout(TimeValue, String)}.
*/
TimeValue timeout();
@ -37,5 +37,5 @@ public interface TimeoutClusterStateUpdateTask extends ClusterStateUpdateTask {
* Called when the cluster sate update task wasn't processed by the provided
* {@link #timeout()}.
*/
void onTimeout(String source);
void onTimeout(TimeValue timeout, String source);
}

View File

@ -27,9 +27,10 @@ import com.google.common.io.Closeables;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -134,7 +135,18 @@ public class MetaDataCreateIndexService extends AbstractComponent {
final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener);
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
boolean indexCreated = false;
@ -502,7 +514,6 @@ public class MetaDataCreateIndexService extends AbstractComponent {
public static class Request {
final String cause;
final String index;
State state = State.OPEN;
@ -515,6 +526,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
TimeValue timeout = TimeValue.timeValueSeconds(5);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
Set<ClusterBlock> blocks = Sets.newHashSet();
@ -566,6 +578,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
this.timeout = timeout;
return this;
}
public Request masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class Response {

View File

@ -19,9 +19,10 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -81,7 +82,18 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
}
final DeleteIndexListener listener = new DeleteIndexListener(mdLock, request, userListener);
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
try {
@ -134,6 +146,10 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
return currentState;
}
}
@Override
public void clusterStateProcessed(ClusterState clusterState) {
}
});
}
@ -191,6 +207,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
final String index;
TimeValue timeout = TimeValue.timeValueSeconds(10);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public Request(String index) {
this.index = index;
@ -200,6 +217,11 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
this.timeout = timeout;
return this;
}
public Request masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class Response {

View File

@ -22,9 +22,10 @@ package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeAliasesUpdatedAction;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
@ -70,7 +71,18 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
}
public void indicesAliases(final Request request, final Listener listener) {
clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(final ClusterState currentState) {
List<String> indicesToClose = Lists.newArrayList();
@ -182,7 +194,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
return currentState;
}
} catch (Throwable t) {
listener.onResponse(new Response(true));
listener.onFailure(t);
return currentState;
} finally {
for (String index : indicesToClose) {
@ -209,11 +221,17 @@ public class MetaDataIndexAliasesService extends AbstractComponent {
final AliasAction[] actions;
final TimeValue timeout;
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public Request(AliasAction[] actions, TimeValue timeout) {
this.actions = actions;
this.timeout = timeout;
}
public Request masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class Response {

View File

@ -23,9 +23,10 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
@ -33,6 +34,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.IndexTemplateAlreadyExistsException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.indices.InvalidIndexTemplateException;
@ -55,7 +57,18 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
}
public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
Set<String> templateNames = Sets.newHashSet();
@ -127,7 +140,18 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
}
final IndexTemplateMetaData template = templateBuilder.build();
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
if (request.create && currentState.metaData().templates().containsKey(request.name)) {
@ -197,6 +221,8 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
Map<String, String> mappings = Maps.newHashMap();
Map<String, IndexMetaData.Custom> customs = Maps.newHashMap();
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public PutRequest(String cause, String name) {
this.cause = cause;
this.name = name;
@ -236,6 +262,11 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
mappings.put(mappingType, mappingSource);
return this;
}
public PutRequest masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class PutResponse {
@ -258,10 +289,16 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
public static class RemoveRequest {
final String name;
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public RemoveRequest(String name) {
this.name = name;
}
public RemoveRequest masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class RemoveResponse {

View File

@ -22,10 +22,8 @@ package org.elasticsearch.cluster.metadata;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Priority;
@ -224,7 +222,17 @@ public class MetaDataMappingService extends AbstractComponent {
public void removeMapping(final RemoveRequest request, final Listener listener) {
final AtomicBoolean notifyOnPostProcess = new AtomicBoolean();
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
if (request.indices.length == 0) {
@ -274,7 +282,17 @@ public class MetaDataMappingService extends AbstractComponent {
public void putMapping(final PutRequest request, final Listener listener) {
final AtomicBoolean notifyOnPostProcess = new AtomicBoolean();
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
List<String> indicesToClose = Lists.newArrayList();
@ -434,13 +452,18 @@ public class MetaDataMappingService extends AbstractComponent {
public static class RemoveRequest {
final String[] indices;
final String mappingType;
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public RemoveRequest(String[] indices, String mappingType) {
this.indices = indices;
this.mappingType = mappingType;
}
public RemoveRequest masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class PutRequest {
@ -454,6 +477,7 @@ public class MetaDataMappingService extends AbstractComponent {
boolean ignoreConflicts = false;
TimeValue timeout = TimeValue.timeValueSeconds(10);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public PutRequest(String[] indices, String mappingType, String mappingSource) {
this.indices = indices;
@ -470,6 +494,11 @@ public class MetaDataMappingService extends AbstractComponent {
this.timeout = timeout;
return this;
}
public PutRequest masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class Response {

View File

@ -20,9 +20,10 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -66,7 +67,18 @@ public class MetaDataStateIndexService extends AbstractComponent {
}
final String indicesAsString = Arrays.toString(request.indices);
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
List<String> indicesToClose = new ArrayList<String>();
@ -123,7 +135,17 @@ public class MetaDataStateIndexService extends AbstractComponent {
}
final String indicesAsString = Arrays.toString(request.indices);
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return request.masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
List<String> indicesToOpen = new ArrayList<String>();
@ -186,6 +208,7 @@ public class MetaDataStateIndexService extends AbstractComponent {
final String[] indices;
TimeValue timeout = TimeValue.timeValueSeconds(10);
TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT;
public Request(String[] indices) {
this.indices = indices;
@ -195,6 +218,11 @@ public class MetaDataStateIndexService extends AbstractComponent {
this.timeout = timeout;
return this;
}
public Request masterTimeout(TimeValue masterTimeout) {
this.masterTimeout = masterTimeout;
return this;
}
}
public static class Response {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.settings.IndexDynamicSettings;
import java.util.Locale;
@ -102,7 +103,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
if (numberOfReplicas >= min && numberOfReplicas <= max) {
final int fNumberOfReplicas = numberOfReplicas;
Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build();
updateSettings(settings, new String[]{indexMetaData.index()}, new Listener() {
updateSettings(settings, new String[]{indexMetaData.index()}, TimeValue.timeValueMinutes(10), new Listener() {
@Override
public void onSuccess() {
logger.info("[{}] auto expanded replicas to [{}]", indexMetaData.index(), fNumberOfReplicas);
@ -121,7 +122,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
}
public void updateSettings(final Settings pSettings, final String[] indices, final Listener listener) {
public void updateSettings(final Settings pSettings, final String[] indices, final TimeValue masterTimeout, final Listener listener) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
for (Map.Entry<String, String> entry : pSettings.getAsMap().entrySet()) {
if (entry.getKey().equals("index")) {
@ -168,7 +169,17 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
final Settings openSettings = updatedSettingsBuilder.build();
clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return masterTimeout;
}
@Override
public void onTimeout(TimeValue timeout, String source) {
listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source));
}
@Override
public ClusterState execute(ClusterState currentState) {
try {
@ -189,7 +200,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
if (!removedSettings.isEmpty() && !openIndices.isEmpty()) {
listener.onFailure(new ElasticSearchIllegalArgumentException(String.format(Locale.ROOT,
listener.onFailure(new ElasticSearchIllegalArgumentException(String.format(Locale.ROOT,
"Can't update non dynamic settings[%s] for open indices[%s]",
removedSettings,
openIndices

View File

@ -0,0 +1,38 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
/**
*/
public class ProcessClusterEventTimeoutException extends ElasticSearchException {
public ProcessClusterEventTimeoutException(TimeValue timeValue, String source) {
super("failed to process cluster event (" + source + ") within " + timeValue);
}
@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
}

View File

@ -225,7 +225,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onTimeout(task.source);
timeoutUpdateTask.onTimeout(timeoutUpdateTask.timeout(), task.source);
}
});
}

View File

@ -52,6 +52,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
clusterRerouteRequest.listenerThreaded(false);
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
clusterRerouteRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterRerouteRequest.masterNodeTimeout()));
if (request.hasContent()) {
try {
clusterRerouteRequest.source(request.content());

View File

@ -48,6 +48,7 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();
clusterUpdateSettingsRequest.listenerThreaded(false);
clusterUpdateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterUpdateSettingsRequest.masterNodeTimeout()));
try {
Map<String, Object> source = XContentFactory.xContent(request.content()).createParser(request.content()).mapAndClose();
if (source.containsKey("transient")) {

View File

@ -56,6 +56,7 @@ public class RestIndicesAliasesAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
indicesAliasesRequest.listenerThreaded(false);
indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout()));
XContentParser parser = null;
try {
// {

View File

@ -52,6 +52,7 @@ public class RestIndexDeleteAliasesAction extends BaseRestHandler {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
indicesAliasesRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
indicesAliasesRequest.removeAlias(index, alias);
indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout()));
client.admin().indices().aliases(indicesAliasesRequest, new ActionListener<IndicesAliasesResponse>() {

View File

@ -110,6 +110,7 @@ public class RestIndexPutAliasAction extends BaseRestHandler {
indicesAliasesRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
AliasAction aliasAction = new AliasAction(AliasAction.Type.ADD, index, alias);
indicesAliasesRequest.addAliasAction(aliasAction);
indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout()));
if (routing != null) {
aliasAction.routing(routing);

View File

@ -53,6 +53,7 @@ public class RestCloseIndexAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(splitIndices(request.param("index")));
closeIndexRequest.listenerThreaded(false);
closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout()));
closeIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
if (request.hasParam("ignore_indices")) {
closeIndexRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices")));

View File

@ -65,6 +65,7 @@ public class RestCreateIndexAction extends BaseRestHandler {
}
createIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout()));
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override

View File

@ -53,6 +53,7 @@ public class RestDeleteIndexAction extends BaseRestHandler {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(splitIndices(request.param("index")));
deleteIndexRequest.listenerThreaded(false);
deleteIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout()));
client.admin().indices().delete(deleteIndexRequest, new ActionListener<DeleteIndexResponse>() {
@Override
public void onResponse(DeleteIndexResponse response) {

View File

@ -53,6 +53,7 @@ public class RestDeleteMappingAction extends BaseRestHandler {
DeleteMappingRequest deleteMappingRequest = deleteMappingRequest(splitIndices(request.param("index")));
deleteMappingRequest.listenerThreaded(false);
deleteMappingRequest.type(request.param("type"));
deleteMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteMappingRequest.masterNodeTimeout()));
client.admin().indices().deleteMapping(deleteMappingRequest, new ActionListener<DeleteMappingResponse>() {
@Override
public void onResponse(DeleteMappingResponse response) {

View File

@ -61,6 +61,7 @@ public class RestPutMappingAction extends BaseRestHandler {
putMappingRequest.source(request.content().toUtf8());
putMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
putMappingRequest.ignoreConflicts(request.paramAsBoolean("ignore_conflicts", putMappingRequest.ignoreConflicts()));
putMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putMappingRequest.masterNodeTimeout()));
client.admin().indices().putMapping(putMappingRequest, new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {

View File

@ -54,6 +54,7 @@ public class RestOpenIndexAction extends BaseRestHandler {
OpenIndexRequest openIndexRequest = new OpenIndexRequest(splitIndices(request.param("index")));
openIndexRequest.listenerThreaded(false);
openIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout()));
if (request.hasParam("ignore_indices")) {
openIndexRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices")));
}

View File

@ -56,6 +56,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(splitIndices(request.param("index")));
updateSettingsRequest.listenerThreaded(false);
updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout()));
ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder();
String bodySettingsStr = request.content().toUtf8();

View File

@ -51,6 +51,7 @@ public class RestDeleteIndexTemplateAction extends BaseRestHandler {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(request.param("name"));
deleteIndexTemplateRequest.listenerThreaded(false);
deleteIndexTemplateRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
deleteIndexTemplateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexTemplateRequest.masterNodeTimeout()));
client.admin().indices().deleteTemplate(deleteIndexTemplateRequest, new ActionListener<DeleteIndexTemplateResponse>() {
@Override
public void onResponse(DeleteIndexTemplateResponse response) {

View File

@ -62,6 +62,7 @@ public class RestPutIndexTemplateAction extends BaseRestHandler {
putRequest.listenerThreaded(false);
putRequest.template(request.param("template", putRequest.template()));
putRequest.order(request.paramAsInt("order", putRequest.order()));
putRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRequest.masterNodeTimeout()));
try {
putRequest.create(request.paramAsBoolean("create", false));

View File

@ -52,6 +52,7 @@ public class RestDeleteWarmerAction extends BaseRestHandler {
DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(request.param("name"))
.indices(RestActions.splitIndices(request.param("index")));
deleteWarmerRequest.listenerThreaded(false);
deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout()));
client.admin().indices().deleteWarmer(deleteWarmerRequest, new ActionListener<DeleteWarmerResponse>() {
@Override
public void onResponse(DeleteWarmerResponse response) {

View File

@ -55,6 +55,7 @@ public class RestPutWarmerAction extends BaseRestHandler {
.types(RestActions.splitTypes(request.param("type")))
.source(request.content(), request.contentUnsafe());
putWarmerRequest.searchRequest(searchRequest);
putWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWarmerRequest.masterNodeTimeout()));
client.admin().indices().putWarmer(putWarmerRequest, new ActionListener<PutWarmerResponse>() {
@Override
public void onResponse(PutWarmerResponse response) {

View File

@ -88,7 +88,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
}
@Override
public void onTimeout(String source) {
public void onTimeout(TimeValue timeout, String source) {
timedOut.countDown();
}
@ -97,6 +97,10 @@ public class ClusterServiceTests extends AbstractZenNodesTests {
executeCalled.set(true);
return currentState;
}
@Override
public void clusterStateProcessed(ClusterState clusterState) {
}
});
assertThat(timedOut.await(500, TimeUnit.MILLISECONDS), equalTo(true));