From 235a68c3bdf65c2f0d6bebc63ebbb7737d949983 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 22 Jul 2013 16:58:00 +0200 Subject: [PATCH] Cluster State Update APIs (master node) to respect master_timeout better Currently, the master node might be processing too many cluster state events, and then be blocked on waiting for its respective even to be processed. We can use the new cluster state update timeout support to use the master_timeout value and respect it. closes #3365 --- .../TransportClusterRerouteAction.java | 20 +++++++-- .../TransportClusterUpdateSettingsAction.java | 23 +++++++--- .../alias/TransportIndicesAliasesAction.java | 2 +- .../close/TransportCloseIndexAction.java | 2 +- .../create/TransportCreateIndexAction.java | 3 +- .../delete/TransportDeleteIndexAction.java | 2 +- .../delete/TransportDeleteMappingAction.java | 4 +- .../put/TransportPutMappingAction.java | 2 +- .../open/TransportOpenIndexAction.java | 2 +- .../TransportUpdateSettingsAction.java | 2 +- .../TransportDeleteIndexTemplateAction.java | 2 +- .../put/TransportPutIndexTemplateAction.java | 3 +- .../delete/TransportDeleteWarmerAction.java | 18 +++++++- .../warmer/put/TransportPutWarmerAction.java | 18 +++++++- .../TimeoutClusterStateUpdateTask.java | 6 +-- .../metadata/MetaDataCreateIndexService.java | 23 ++++++++-- .../metadata/MetaDataDeleteIndexService.java | 26 ++++++++++- .../metadata/MetaDataIndexAliasesService.java | 24 +++++++++-- .../MetaDataIndexTemplateService.java | 43 +++++++++++++++++-- .../metadata/MetaDataMappingService.java | 43 ++++++++++++++++--- .../metadata/MetaDataStateIndexService.java | 34 +++++++++++++-- .../MetaDataUpdateSettingsService.java | 19 ++++++-- .../ProcessClusterEventTimeoutException.java | 38 ++++++++++++++++ .../service/InternalClusterService.java | 2 +- .../reroute/RestClusterRerouteAction.java | 1 + .../RestClusterUpdateSettingsAction.java | 1 + .../alias/RestIndicesAliasesAction.java | 1 + .../delete/RestIndexDeleteAliasesAction.java | 1 + .../alias/put/RestIndexPutAliasAction.java | 1 + .../indices/close/RestCloseIndexAction.java | 1 + .../indices/create/RestCreateIndexAction.java | 1 + .../indices/delete/RestDeleteIndexAction.java | 1 + .../delete/RestDeleteMappingAction.java | 1 + .../mapping/put/RestPutMappingAction.java | 1 + .../indices/open/RestOpenIndexAction.java | 1 + .../settings/RestUpdateSettingsAction.java | 1 + .../delete/RestDeleteIndexTemplateAction.java | 1 + .../put/RestPutIndexTemplateAction.java | 1 + .../warmer/delete/RestDeleteWarmerAction.java | 1 + .../warmer/put/RestPutWarmerAction.java | 1 + .../cluster/ClusterServiceTests.java | 6 ++- 41 files changed, 329 insertions(+), 54 deletions(-) create mode 100644 src/main/java/org/elasticsearch/cluster/metadata/ProcessClusterEventTimeoutException.java diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index 243eaacd379..5dc0e99ade6 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -23,12 +23,14 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -76,7 +78,19 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA final AtomicReference clusterStateResponse = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + failureRef.set(new ProcessClusterEventTimeoutException(timeout, source)); + latch.countDown(); + } + @Override public ClusterState execute(ClusterState currentState) { try { @@ -87,7 +101,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA return currentState; } return newState; - } catch (Exception e) { + } catch (Throwable e) { logger.debug("failed to reroute", e); failureRef.set(e); latch.countDown(); diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 721395b6f17..696da862676 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -24,9 +24,10 @@ import org.elasticsearch.action.support.master.TransportMasterNodeOperationActio import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.settings.ClusterDynamicSettings; @@ -35,9 +36,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -92,7 +91,19 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder(); final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder(); - clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + failureRef.set(new ProcessClusterEventTimeoutException(timeout, source)); + latch.countDown(); + } + @Override public ClusterState execute(ClusterState currentState) { try { @@ -149,7 +160,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe } return ClusterState.builder().state(currentState).metaData(metaData).blocks(blocks).build(); - } catch (Exception e) { + } catch (Throwable e) { latch.countDown(); logger.warn("failed to update cluster settings", e); return currentState; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java b/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java index 9b4d2b5486b..4236a8e8d1e 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java @@ -85,7 +85,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA final AtomicReference responseRef = new AtomicReference(); final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]), request.timeout()), new MetaDataIndexAliasesService.Listener() { + indexAliasesService.indicesAliases(new MetaDataIndexAliasesService.Request(request.aliasActions().toArray(new AliasAction[request.aliasActions().size()]), request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexAliasesService.Listener() { @Override public void onResponse(MetaDataIndexAliasesService.Response response) { responseRef.set(new IndicesAliasesResponse(response.acknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index 15d87b97a99..4b7f26f683b 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -99,7 +99,7 @@ public class TransportCloseIndexAction extends TransportMasterNodeOperationActio final AtomicReference responseRef = new AtomicReference(); final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - stateIndexService.closeIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()), new MetaDataStateIndexService.Listener() { + stateIndexService.closeIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataStateIndexService.Listener() { @Override public void onResponse(MetaDataStateIndexService.Response response) { responseRef.set(new CloseIndexResponse(response.acknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 5168c3a2197..b663def407b 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -86,7 +86,8 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi createIndexService.createIndex(new MetaDataCreateIndexService.Request(cause, request.index()).settings(request.settings()) .mappings(request.mappings()) .customs(request.customs()) - .timeout(request.timeout()), + .timeout(request.timeout()) + .masterTimeout(request.masterNodeTimeout()), new MetaDataCreateIndexService.Listener() { @Override public void onResponse(MetaDataCreateIndexService.Response response) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index 4a13ae153c9..a3b21e191fd 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -108,7 +108,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(request.indices().length); for (final String index : request.indices()) { - deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()), new MetaDataDeleteIndexService.Listener() { + deleteIndexService.deleteIndex(new MetaDataDeleteIndexService.Request(index).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataDeleteIndexService.Listener() { @Override public void onResponse(MetaDataDeleteIndexService.Response response) { responseRef.set(new DeleteIndexResponse(response.acknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java index c8acea7b970..18b66f0fcff 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/delete/TransportDeleteMappingAction.java @@ -114,7 +114,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc refreshAction.execute(Requests.refreshRequest(request.indices()), new ActionListener() { @Override public void onResponse(RefreshResponse refreshResponse) { - metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()), new MetaDataMappingService.Listener() { + metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() { @Override public void onResponse(MetaDataMappingService.Response response) { latch.countDown(); @@ -130,7 +130,7 @@ public class TransportDeleteMappingAction extends TransportMasterNodeOperationAc @Override public void onFailure(Throwable e) { - metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()), new MetaDataMappingService.Listener() { + metaDataMappingService.removeMapping(new MetaDataMappingService.RemoveRequest(request.indices(), request.type()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() { @Override public void onResponse(MetaDataMappingService.Response response) { latch.countDown(); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index ad68088e22a..078976592eb 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -90,7 +90,7 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio final AtomicReference responseRef = new AtomicReference(); final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()), new MetaDataMappingService.Listener() { + metaDataMappingService.putMapping(new MetaDataMappingService.PutRequest(request.indices(), request.type(), request.source()).ignoreConflicts(request.ignoreConflicts()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataMappingService.Listener() { @Override public void onResponse(MetaDataMappingService.Response response) { responseRef.set(new PutMappingResponse(response.acknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java index 445147594a4..c0425194a6d 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java @@ -85,7 +85,7 @@ public class TransportOpenIndexAction extends TransportMasterNodeOperationAction final AtomicReference responseRef = new AtomicReference(); final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - stateIndexService.openIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()), new MetaDataStateIndexService.Listener() { + stateIndexService.openIndex(new MetaDataStateIndexService.Request(request.indices()).timeout(request.timeout()).masterTimeout(request.masterNodeTimeout()), new MetaDataStateIndexService.Listener() { @Override public void onResponse(MetaDataStateIndexService.Response response) { responseRef.set(new OpenIndexResponse(response.acknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java index 3ec03ce6b2a..350b7849059 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/settings/TransportUpdateSettingsAction.java @@ -71,7 +71,7 @@ public class TransportUpdateSettingsAction extends TransportMasterNodeOperationA final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - updateSettingsService.updateSettings(request.settings(), request.indices(), new MetaDataUpdateSettingsService.Listener() { + updateSettingsService.updateSettings(request.settings(), request.indices(), request.masterNodeTimeout(), new MetaDataUpdateSettingsService.Listener() { @Override public void onSuccess() { latch.countDown(); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java b/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java index a18d3b60856..1e9148b6973 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java @@ -79,7 +79,7 @@ public class TransportDeleteIndexTemplateAction extends TransportMasterNodeOpera final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - indexTemplateService.removeTemplates(new MetaDataIndexTemplateService.RemoveRequest(request.name()), new MetaDataIndexTemplateService.RemoveListener() { + indexTemplateService.removeTemplates(new MetaDataIndexTemplateService.RemoveRequest(request.name()).masterTimeout(request.masterNodeTimeout()), new MetaDataIndexTemplateService.RemoveListener() { @Override public void onResponse(MetaDataIndexTemplateService.RemoveResponse response) { responseRef.set(new DeleteIndexTemplateResponse(response.acknowledged())); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/template/put/TransportPutIndexTemplateAction.java b/src/main/java/org/elasticsearch/action/admin/indices/template/put/TransportPutIndexTemplateAction.java index ddc327b3a04..cfabe80fe62 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/template/put/TransportPutIndexTemplateAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/template/put/TransportPutIndexTemplateAction.java @@ -89,7 +89,8 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeOperatio .settings(request.settings()) .mappings(request.mappings()) .customs(request.customs()) - .create(request.create()), + .create(request.create()) + .masterTimeout(request.masterNodeTimeout()), new MetaDataIndexTemplateService.PutListener() { @Override diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java index 3ea1cb30b13..46b8bb3318a 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/delete/TransportDeleteWarmerAction.java @@ -25,14 +25,16 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.warmer.IndexWarmerMissingException; @@ -91,7 +93,19 @@ public class TransportDeleteWarmerAction extends TransportMasterNodeOperationAct final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("delete_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + failureRef.set(new ProcessClusterEventTimeoutException(timeout, source)); + latch.countDown(); + } + @Override public ClusterState execute(ClusterState currentState) { try { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java index da30cd0dfbf..fe6f1f64843 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/warmer/put/TransportPutWarmerAction.java @@ -25,14 +25,16 @@ import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.warmer.IndexWarmersMetaData; @@ -101,7 +103,19 @@ public class TransportPutWarmerAction extends TransportMasterNodeOperationAction final AtomicReference failureRef = new AtomicReference(); final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put_warmer [" + request.name() + "]", new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + failureRef.set(new ProcessClusterEventTimeoutException(timeout, source)); + latch.countDown(); + } + @Override public ClusterState execute(ClusterState currentState) { MetaData metaData = currentState.metaData(); diff --git a/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java index 4b3fe131d3b..b99d10e6ab8 100644 --- a/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java +++ b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java @@ -25,11 +25,11 @@ import org.elasticsearch.common.unit.TimeValue; * An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate * a timeout. */ -public interface TimeoutClusterStateUpdateTask extends ClusterStateUpdateTask { +public interface TimeoutClusterStateUpdateTask extends ProcessedClusterStateUpdateTask { /** * If the cluster state update task wasn't processed by the provided timeout, call - * {@link #onTimeout(String)}. + * {@link #onTimeout(TimeValue, String)}. */ TimeValue timeout(); @@ -37,5 +37,5 @@ public interface TimeoutClusterStateUpdateTask extends ClusterStateUpdateTask { * Called when the cluster sate update task wasn't processed by the provided * {@link #timeout()}. */ - void onTimeout(String source); + void onTimeout(TimeValue timeout, String source); } diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 459bc7211dd..bec4a2e345c 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -27,9 +27,10 @@ import com.google.common.io.Closeables; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -134,7 +135,18 @@ public class MetaDataCreateIndexService extends AbstractComponent { final CreateIndexListener listener = new CreateIndexListener(mdLock, request, userListener); - clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("create-index [" + request.index + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { boolean indexCreated = false; @@ -502,7 +514,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { public static class Request { final String cause; - final String index; State state = State.OPEN; @@ -515,6 +526,7 @@ public class MetaDataCreateIndexService extends AbstractComponent { TimeValue timeout = TimeValue.timeValueSeconds(5); + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; Set blocks = Sets.newHashSet(); @@ -566,6 +578,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { this.timeout = timeout; return this; } + + public Request masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class Response { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java index 851efa8df7b..a50992aff36 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java @@ -19,9 +19,10 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; @@ -81,7 +82,18 @@ public class MetaDataDeleteIndexService extends AbstractComponent { } final DeleteIndexListener listener = new DeleteIndexListener(mdLock, request, userListener); - clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("delete-index [" + request.index + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { try { @@ -134,6 +146,10 @@ public class MetaDataDeleteIndexService extends AbstractComponent { return currentState; } } + + @Override + public void clusterStateProcessed(ClusterState clusterState) { + } }); } @@ -191,6 +207,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent { final String index; TimeValue timeout = TimeValue.timeValueSeconds(10); + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; public Request(String index) { this.index = index; @@ -200,6 +217,11 @@ public class MetaDataDeleteIndexService extends AbstractComponent { this.timeout = timeout; return this; } + + public Request masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class Response { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java index d1463540de0..8f9452017d2 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexAliasesService.java @@ -22,9 +22,10 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeAliasesUpdatedAction; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -70,7 +71,18 @@ public class MetaDataIndexAliasesService extends AbstractComponent { } public void indicesAliases(final Request request, final Listener listener) { - clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("index-aliases", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(final ClusterState currentState) { List indicesToClose = Lists.newArrayList(); @@ -182,7 +194,7 @@ public class MetaDataIndexAliasesService extends AbstractComponent { return currentState; } } catch (Throwable t) { - listener.onResponse(new Response(true)); + listener.onFailure(t); return currentState; } finally { for (String index : indicesToClose) { @@ -209,11 +221,17 @@ public class MetaDataIndexAliasesService extends AbstractComponent { final AliasAction[] actions; final TimeValue timeout; + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; public Request(AliasAction[] actions, TimeValue timeout) { this.actions = actions; this.timeout = timeout; } + + public Request masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class Response { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java index 298f2067652..fcae80df1bd 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexTemplateService.java @@ -23,9 +23,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.component.AbstractComponent; @@ -33,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.indices.IndexTemplateAlreadyExistsException; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.InvalidIndexTemplateException; @@ -55,7 +57,18 @@ public class MetaDataIndexTemplateService extends AbstractComponent { } public void removeTemplates(final RemoveRequest request, final RemoveListener listener) { - clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { Set templateNames = Sets.newHashSet(); @@ -127,7 +140,18 @@ public class MetaDataIndexTemplateService extends AbstractComponent { } final IndexTemplateMetaData template = templateBuilder.build(); - clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { if (request.create && currentState.metaData().templates().containsKey(request.name)) { @@ -197,6 +221,8 @@ public class MetaDataIndexTemplateService extends AbstractComponent { Map mappings = Maps.newHashMap(); Map customs = Maps.newHashMap(); + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; + public PutRequest(String cause, String name) { this.cause = cause; this.name = name; @@ -236,6 +262,11 @@ public class MetaDataIndexTemplateService extends AbstractComponent { mappings.put(mappingType, mappingSource); return this; } + + public PutRequest masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class PutResponse { @@ -258,10 +289,16 @@ public class MetaDataIndexTemplateService extends AbstractComponent { public static class RemoveRequest { final String name; + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; public RemoveRequest(String name) { this.name = name; } + + public RemoveRequest masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class RemoveResponse { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index 67b64df310e..44de6151f8e 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -22,10 +22,8 @@ package org.elasticsearch.cluster.metadata; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Priority; @@ -224,7 +222,17 @@ public class MetaDataMappingService extends AbstractComponent { public void removeMapping(final RemoveRequest request, final Listener listener) { final AtomicBoolean notifyOnPostProcess = new AtomicBoolean(); - clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("remove-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { if (request.indices.length == 0) { @@ -274,7 +282,17 @@ public class MetaDataMappingService extends AbstractComponent { public void putMapping(final PutRequest request, final Listener listener) { final AtomicBoolean notifyOnPostProcess = new AtomicBoolean(); - clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("put-mapping [" + request.mappingType + "]", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { List indicesToClose = Lists.newArrayList(); @@ -434,13 +452,18 @@ public class MetaDataMappingService extends AbstractComponent { public static class RemoveRequest { final String[] indices; - final String mappingType; + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; public RemoveRequest(String[] indices, String mappingType) { this.indices = indices; this.mappingType = mappingType; } + + public RemoveRequest masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class PutRequest { @@ -454,6 +477,7 @@ public class MetaDataMappingService extends AbstractComponent { boolean ignoreConflicts = false; TimeValue timeout = TimeValue.timeValueSeconds(10); + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; public PutRequest(String[] indices, String mappingType, String mappingSource) { this.indices = indices; @@ -470,6 +494,11 @@ public class MetaDataMappingService extends AbstractComponent { this.timeout = timeout; return this; } + + public PutRequest masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class Response { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java index ebffcd307ff..71d754d0e29 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataStateIndexService.java @@ -20,9 +20,10 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; +import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -66,7 +67,18 @@ public class MetaDataStateIndexService extends AbstractComponent { } final String indicesAsString = Arrays.toString(request.indices); - clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("close-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() { + + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { List indicesToClose = new ArrayList(); @@ -123,7 +135,17 @@ public class MetaDataStateIndexService extends AbstractComponent { } final String indicesAsString = Arrays.toString(request.indices); - clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("open-indices " + indicesAsString, Priority.URGENT, new TimeoutClusterStateUpdateTask() { + @Override + public TimeValue timeout() { + return request.masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { List indicesToOpen = new ArrayList(); @@ -186,6 +208,7 @@ public class MetaDataStateIndexService extends AbstractComponent { final String[] indices; TimeValue timeout = TimeValue.timeValueSeconds(10); + TimeValue masterTimeout = MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT; public Request(String[] indices) { this.indices = indices; @@ -195,6 +218,11 @@ public class MetaDataStateIndexService extends AbstractComponent { this.timeout = timeout; return this; } + + public Request masterTimeout(TimeValue masterTimeout) { + this.masterTimeout = masterTimeout; + return this; + } } public static class Response { diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java index 0b3820ab594..3dd4d22df13 100644 --- a/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java +++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaDataUpdateSettingsService.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.settings.IndexDynamicSettings; import java.util.Locale; @@ -102,7 +103,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements if (numberOfReplicas >= min && numberOfReplicas <= max) { final int fNumberOfReplicas = numberOfReplicas; Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build(); - updateSettings(settings, new String[]{indexMetaData.index()}, new Listener() { + updateSettings(settings, new String[]{indexMetaData.index()}, TimeValue.timeValueMinutes(10), new Listener() { @Override public void onSuccess() { logger.info("[{}] auto expanded replicas to [{}]", indexMetaData.index(), fNumberOfReplicas); @@ -121,7 +122,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } } - public void updateSettings(final Settings pSettings, final String[] indices, final Listener listener) { + public void updateSettings(final Settings pSettings, final String[] indices, final TimeValue masterTimeout, final Listener listener) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); for (Map.Entry entry : pSettings.getAsMap().entrySet()) { if (entry.getKey().equals("index")) { @@ -168,7 +169,17 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } final Settings openSettings = updatedSettingsBuilder.build(); - clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("update-settings", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + @Override + public TimeValue timeout() { + return masterTimeout; + } + + @Override + public void onTimeout(TimeValue timeout, String source) { + listener.onFailure(new ProcessClusterEventTimeoutException(timeout, source)); + } + @Override public ClusterState execute(ClusterState currentState) { try { @@ -189,7 +200,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements } if (!removedSettings.isEmpty() && !openIndices.isEmpty()) { - listener.onFailure(new ElasticSearchIllegalArgumentException(String.format(Locale.ROOT, + listener.onFailure(new ElasticSearchIllegalArgumentException(String.format(Locale.ROOT, "Can't update non dynamic settings[%s] for open indices[%s]", removedSettings, openIndices diff --git a/src/main/java/org/elasticsearch/cluster/metadata/ProcessClusterEventTimeoutException.java b/src/main/java/org/elasticsearch/cluster/metadata/ProcessClusterEventTimeoutException.java new file mode 100644 index 00000000000..efe9f0ee663 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/metadata/ProcessClusterEventTimeoutException.java @@ -0,0 +1,38 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; + +/** + */ +public class ProcessClusterEventTimeoutException extends ElasticSearchException { + + public ProcessClusterEventTimeoutException(TimeValue timeValue, String source) { + super("failed to process cluster event (" + source + ") within " + timeValue); + } + + @Override + public RestStatus status() { + return RestStatus.SERVICE_UNAVAILABLE; + } +} diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 1efe7ab290e..330ee9a8e8b 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -225,7 +225,7 @@ public class InternalClusterService extends AbstractLifecycleComponent source = XContentFactory.xContent(request.content()).createParser(request.content()).mapAndClose(); if (source.containsKey("transient")) { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/RestIndicesAliasesAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/RestIndicesAliasesAction.java index 91728162e0b..357f032c1b7 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/RestIndicesAliasesAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/RestIndicesAliasesAction.java @@ -56,6 +56,7 @@ public class RestIndicesAliasesAction extends BaseRestHandler { public void handleRequest(final RestRequest request, final RestChannel channel) { IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); indicesAliasesRequest.listenerThreaded(false); + indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout())); XContentParser parser = null; try { // { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/delete/RestIndexDeleteAliasesAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/delete/RestIndexDeleteAliasesAction.java index e4769cab083..3e4da956fa1 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/delete/RestIndexDeleteAliasesAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/delete/RestIndexDeleteAliasesAction.java @@ -52,6 +52,7 @@ public class RestIndexDeleteAliasesAction extends BaseRestHandler { IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); indicesAliasesRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); indicesAliasesRequest.removeAlias(index, alias); + indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout())); client.admin().indices().aliases(indicesAliasesRequest, new ActionListener() { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/put/RestIndexPutAliasAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/put/RestIndexPutAliasAction.java index 5e0e773426c..8023510fb2a 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/put/RestIndexPutAliasAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/alias/put/RestIndexPutAliasAction.java @@ -110,6 +110,7 @@ public class RestIndexPutAliasAction extends BaseRestHandler { indicesAliasesRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); AliasAction aliasAction = new AliasAction(AliasAction.Type.ADD, index, alias); indicesAliasesRequest.addAliasAction(aliasAction); + indicesAliasesRequest.masterNodeTimeout(request.paramAsTime("master_timeout", indicesAliasesRequest.masterNodeTimeout())); if (routing != null) { aliasAction.routing(routing); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/close/RestCloseIndexAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/close/RestCloseIndexAction.java index 9e9afaffc46..573ab9b253a 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/close/RestCloseIndexAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/close/RestCloseIndexAction.java @@ -53,6 +53,7 @@ public class RestCloseIndexAction extends BaseRestHandler { public void handleRequest(final RestRequest request, final RestChannel channel) { CloseIndexRequest closeIndexRequest = new CloseIndexRequest(splitIndices(request.param("index"))); closeIndexRequest.listenerThreaded(false); + closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout())); closeIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); if (request.hasParam("ignore_indices")) { closeIndexRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices"))); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java index d91699ad920..21abac112f6 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java @@ -65,6 +65,7 @@ public class RestCreateIndexAction extends BaseRestHandler { } createIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); + createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout())); client.admin().indices().create(createIndexRequest, new ActionListener() { @Override diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java index 97a4879598b..0a53dae0674 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java @@ -53,6 +53,7 @@ public class RestDeleteIndexAction extends BaseRestHandler { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(splitIndices(request.param("index"))); deleteIndexRequest.listenerThreaded(false); deleteIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); + deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout())); client.admin().indices().delete(deleteIndexRequest, new ActionListener() { @Override public void onResponse(DeleteIndexResponse response) { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java index ac096fa9003..cb3310aa03f 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/delete/RestDeleteMappingAction.java @@ -53,6 +53,7 @@ public class RestDeleteMappingAction extends BaseRestHandler { DeleteMappingRequest deleteMappingRequest = deleteMappingRequest(splitIndices(request.param("index"))); deleteMappingRequest.listenerThreaded(false); deleteMappingRequest.type(request.param("type")); + deleteMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteMappingRequest.masterNodeTimeout())); client.admin().indices().deleteMapping(deleteMappingRequest, new ActionListener() { @Override public void onResponse(DeleteMappingResponse response) { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/put/RestPutMappingAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/put/RestPutMappingAction.java index 9bef144e9b7..5da59a8d31f 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/put/RestPutMappingAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/put/RestPutMappingAction.java @@ -61,6 +61,7 @@ public class RestPutMappingAction extends BaseRestHandler { putMappingRequest.source(request.content().toUtf8()); putMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); putMappingRequest.ignoreConflicts(request.paramAsBoolean("ignore_conflicts", putMappingRequest.ignoreConflicts())); + putMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putMappingRequest.masterNodeTimeout())); client.admin().indices().putMapping(putMappingRequest, new ActionListener() { @Override public void onResponse(PutMappingResponse response) { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java index 62428f94574..9dc2e61bdf4 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/open/RestOpenIndexAction.java @@ -54,6 +54,7 @@ public class RestOpenIndexAction extends BaseRestHandler { OpenIndexRequest openIndexRequest = new OpenIndexRequest(splitIndices(request.param("index"))); openIndexRequest.listenerThreaded(false); openIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); + openIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", openIndexRequest.masterNodeTimeout())); if (request.hasParam("ignore_indices")) { openIndexRequest.ignoreIndices(IgnoreIndices.fromString(request.param("ignore_indices"))); } diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java index 80a16c08a12..432cc58b229 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/settings/RestUpdateSettingsAction.java @@ -56,6 +56,7 @@ public class RestUpdateSettingsAction extends BaseRestHandler { public void handleRequest(final RestRequest request, final RestChannel channel) { UpdateSettingsRequest updateSettingsRequest = updateSettingsRequest(splitIndices(request.param("index"))); updateSettingsRequest.listenerThreaded(false); + updateSettingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", updateSettingsRequest.masterNodeTimeout())); ImmutableSettings.Builder updateSettings = ImmutableSettings.settingsBuilder(); String bodySettingsStr = request.content().toUtf8(); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/template/delete/RestDeleteIndexTemplateAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/template/delete/RestDeleteIndexTemplateAction.java index 559ed45d4e1..2f78c3c8751 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/template/delete/RestDeleteIndexTemplateAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/template/delete/RestDeleteIndexTemplateAction.java @@ -51,6 +51,7 @@ public class RestDeleteIndexTemplateAction extends BaseRestHandler { DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(request.param("name")); deleteIndexTemplateRequest.listenerThreaded(false); deleteIndexTemplateRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); + deleteIndexTemplateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexTemplateRequest.masterNodeTimeout())); client.admin().indices().deleteTemplate(deleteIndexTemplateRequest, new ActionListener() { @Override public void onResponse(DeleteIndexTemplateResponse response) { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/template/put/RestPutIndexTemplateAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/template/put/RestPutIndexTemplateAction.java index 9bcc82a3730..40c8074d8cc 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/template/put/RestPutIndexTemplateAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/template/put/RestPutIndexTemplateAction.java @@ -62,6 +62,7 @@ public class RestPutIndexTemplateAction extends BaseRestHandler { putRequest.listenerThreaded(false); putRequest.template(request.param("template", putRequest.template())); putRequest.order(request.paramAsInt("order", putRequest.order())); + putRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRequest.masterNodeTimeout())); try { putRequest.create(request.paramAsBoolean("create", false)); diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java index 7efac34f956..78d69316969 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/delete/RestDeleteWarmerAction.java @@ -52,6 +52,7 @@ public class RestDeleteWarmerAction extends BaseRestHandler { DeleteWarmerRequest deleteWarmerRequest = new DeleteWarmerRequest(request.param("name")) .indices(RestActions.splitIndices(request.param("index"))); deleteWarmerRequest.listenerThreaded(false); + deleteWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteWarmerRequest.masterNodeTimeout())); client.admin().indices().deleteWarmer(deleteWarmerRequest, new ActionListener() { @Override public void onResponse(DeleteWarmerResponse response) { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java index 963776c5779..7779c7d3909 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/warmer/put/RestPutWarmerAction.java @@ -55,6 +55,7 @@ public class RestPutWarmerAction extends BaseRestHandler { .types(RestActions.splitTypes(request.param("type"))) .source(request.content(), request.contentUnsafe()); putWarmerRequest.searchRequest(searchRequest); + putWarmerRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWarmerRequest.masterNodeTimeout())); client.admin().indices().putWarmer(putWarmerRequest, new ActionListener() { @Override public void onResponse(PutWarmerResponse response) { diff --git a/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java index 859ba30585c..7b1f11b1223 100644 --- a/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java @@ -88,7 +88,7 @@ public class ClusterServiceTests extends AbstractZenNodesTests { } @Override - public void onTimeout(String source) { + public void onTimeout(TimeValue timeout, String source) { timedOut.countDown(); } @@ -97,6 +97,10 @@ public class ClusterServiceTests extends AbstractZenNodesTests { executeCalled.set(true); return currentState; } + + @Override + public void clusterStateProcessed(ClusterState clusterState) { + } }); assertThat(timedOut.await(500, TimeUnit.MILLISECONDS), equalTo(true));