diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java index 4fed1f5fcce..d240a8354d1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequest.java @@ -42,6 +42,8 @@ public class ClusterStateRequest extends MasterNodeOperationRequest { private String[] filteredIndices = Strings.EMPTY_ARRAY; + private boolean local = false; + public ClusterStateRequest() { } @@ -94,6 +96,15 @@ public class ClusterStateRequest extends MasterNodeOperationRequest { return this; } + public ClusterStateRequest local(boolean local) { + this.local = local; + return this; + } + + public boolean local() { + return this.local; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); filterRoutingTable = in.readBoolean(); @@ -107,6 +118,7 @@ public class ClusterStateRequest extends MasterNodeOperationRequest { filteredIndices[i] = in.readUTF(); } } + local = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { @@ -119,5 +131,6 @@ public class ClusterStateRequest extends MasterNodeOperationRequest { for (String filteredIndex : filteredIndices) { out.writeUTF(filteredIndex); } + out.writeBoolean(local); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java index eae3b7d9bff..aa75487649e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -62,6 +62,10 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct return new ClusterStateResponse(); } + @Override protected boolean localExecute(ClusterStateRequest request) { + return request.local(); + } + @Override protected ClusterStateResponse masterOperation(ClusterStateRequest request, ClusterState state) throws ElasticSearchException { ClusterState currentState = clusterService.state(); ClusterState.Builder builder = newClusterStateBuilder(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java index 59fde617cc4..a54b8b48018 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeOperationAction.java @@ -65,6 +65,10 @@ public abstract class TransportMasterNodeOperationAction listener, final boolean retrying) { final ClusterState clusterState = clusterService.state(); final DiscoveryNodes nodes = clusterState.nodes(); - if (nodes.localNodeMaster()) { + if (nodes.localNodeMaster() || localExecute(request)) { threadPool.execute(new Runnable() { @Override public void run() { try { @@ -183,7 +187,7 @@ public abstract class TransportMasterNodeOperationAction listener) { client.state(request, listener); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java index 017fe2b05bd..1fe0afad74c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -47,7 +47,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*; */ public class GatewayService extends AbstractLifecycleComponent implements ClusterStateListener { - public final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", ClusterBlockLevel.ALL); + public static final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", ClusterBlockLevel.ALL); private final Gateway gateway; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java index 066716ad950..9a90601ce82 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/state/RestClusterStateAction.java @@ -69,6 +69,7 @@ public class RestClusterStateAction extends BaseRestHandler { clusterStateRequest.filterMetaData(request.paramAsBoolean("filter_metadata", clusterStateRequest.filterMetaData())); clusterStateRequest.filterBlocks(request.paramAsBoolean("filter_blocks", clusterStateRequest.filterBlocks())); clusterStateRequest.filteredIndices(RestActions.splitIndices(request.param("filter_indices", null))); + clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local())); client.admin().cluster().state(clusterStateRequest, new ActionListener() { @Override public void onResponse(ClusterStateResponse response) { try { diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/none/RecoverAfterNodesTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/none/RecoverAfterNodesTests.java new file mode 100644 index 00000000000..e6d564af1e4 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/none/RecoverAfterNodesTests.java @@ -0,0 +1,158 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.gateway.none; + +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class RecoverAfterNodesTests extends AbstractNodesTests { + + @AfterMethod public void closeNodes() { + closeAllNodes(); + } + + @Test public void testRecoverAfterNodes() { + logger.info("--> start node (1)"); + startNode("node1", settingsBuilder().put("gateway.recover_after_nodes", 3)); + assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start node (2)"); + startNode("node2", settingsBuilder().put("gateway.recover_after_nodes", 3)); + assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + assertThat(client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start node (3)"); + startNode("node3", settingsBuilder().put("gateway.recover_after_nodes", 3)); + + assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("node3").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + } + + @Test public void testRecoverAfterMasterNodes() { + logger.info("--> start master_node (1)"); + startNode("master1", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start data_node (1)"); + startNode("data1", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start data_node (2)"); + startNode("data2", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + assertThat(client("data2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start master_node (2)"); + startNode("master2", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("master2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("data2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + } + + @Test public void testRecoverAfterDataNodes() { + logger.info("--> start master_node (1)"); + startNode("master1", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start data_node (1)"); + startNode("data1", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start master_node (2)"); + startNode("master2", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + assertThat(client("master2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA), + hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)); + + logger.info("--> start data_node (2)"); + startNode("data2", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false)); + assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("master2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + assertThat(client("data2").admin().cluster().prepareState().setLocal(true).execute().actionGet() + .state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(), + equalTo(true)); + } +}