add recover_after gateway tests
This commit is contained in:
parent
3ec95f4e84
commit
eb4f4f99b3
|
@ -42,6 +42,8 @@ public class ClusterStateRequest extends MasterNodeOperationRequest {
|
|||
|
||||
private String[] filteredIndices = Strings.EMPTY_ARRAY;
|
||||
|
||||
private boolean local = false;
|
||||
|
||||
public ClusterStateRequest() {
|
||||
}
|
||||
|
||||
|
@ -94,6 +96,15 @@ public class ClusterStateRequest extends MasterNodeOperationRequest {
|
|||
return this;
|
||||
}
|
||||
|
||||
public ClusterStateRequest local(boolean local) {
|
||||
this.local = local;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean local() {
|
||||
return this.local;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
filterRoutingTable = in.readBoolean();
|
||||
|
@ -107,6 +118,7 @@ public class ClusterStateRequest extends MasterNodeOperationRequest {
|
|||
filteredIndices[i] = in.readUTF();
|
||||
}
|
||||
}
|
||||
local = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
|
@ -119,5 +131,6 @@ public class ClusterStateRequest extends MasterNodeOperationRequest {
|
|||
for (String filteredIndex : filteredIndices) {
|
||||
out.writeUTF(filteredIndex);
|
||||
}
|
||||
out.writeBoolean(local);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,10 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct
|
|||
return new ClusterStateResponse();
|
||||
}
|
||||
|
||||
@Override protected boolean localExecute(ClusterStateRequest request) {
|
||||
return request.local();
|
||||
}
|
||||
|
||||
@Override protected ClusterStateResponse masterOperation(ClusterStateRequest request, ClusterState state) throws ElasticSearchException {
|
||||
ClusterState currentState = clusterService.state();
|
||||
ClusterState.Builder builder = newClusterStateBuilder();
|
||||
|
|
|
@ -65,6 +65,10 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
|
|||
|
||||
protected abstract Response masterOperation(Request request, ClusterState state) throws ElasticSearchException;
|
||||
|
||||
protected boolean localExecute(Request request) {
|
||||
return false;
|
||||
}
|
||||
|
||||
protected void checkBlock(Request request, ClusterState state) {
|
||||
|
||||
}
|
||||
|
@ -80,7 +84,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
|
|||
private void innerExecute(final Request request, final ActionListener<Response> listener, final boolean retrying) {
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
final DiscoveryNodes nodes = clusterState.nodes();
|
||||
if (nodes.localNodeMaster()) {
|
||||
if (nodes.localNodeMaster() || localExecute(request)) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
|
@ -183,7 +187,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
|
|||
|
||||
@Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
if (clusterState.nodes().localNodeMaster()) {
|
||||
if (clusterState.nodes().localNodeMaster() || localExecute(request)) {
|
||||
checkBlock(request, clusterState);
|
||||
Response response = masterOperation(request, clusterState);
|
||||
channel.sendResponse(response);
|
||||
|
|
|
@ -87,6 +87,14 @@ public class ClusterStateRequestBuilder extends BaseClusterRequestBuilder<Cluste
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets if the cluster state request should be executed locally on the node, and not go to the master.
|
||||
*/
|
||||
public ClusterStateRequestBuilder setLocal(boolean local) {
|
||||
request.local(local);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override protected void doExecute(ActionListener<ClusterStateResponse> listener) {
|
||||
client.state(request, listener);
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ import static org.elasticsearch.cluster.metadata.MetaData.*;
|
|||
*/
|
||||
public class GatewayService extends AbstractLifecycleComponent<GatewayService> implements ClusterStateListener {
|
||||
|
||||
public final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", ClusterBlockLevel.ALL);
|
||||
public static final ClusterBlock NOT_RECOVERED_FROM_GATEWAY_BLOCK = new ClusterBlock(1, "not recovered from gateway", ClusterBlockLevel.ALL);
|
||||
|
||||
private final Gateway gateway;
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ public class RestClusterStateAction extends BaseRestHandler {
|
|||
clusterStateRequest.filterMetaData(request.paramAsBoolean("filter_metadata", clusterStateRequest.filterMetaData()));
|
||||
clusterStateRequest.filterBlocks(request.paramAsBoolean("filter_blocks", clusterStateRequest.filterBlocks()));
|
||||
clusterStateRequest.filteredIndices(RestActions.splitIndices(request.param("filter_indices", null)));
|
||||
clusterStateRequest.local(request.paramAsBoolean("local", clusterStateRequest.local()));
|
||||
client.admin().cluster().state(clusterStateRequest, new ActionListener<ClusterStateResponse>() {
|
||||
@Override public void onResponse(ClusterStateResponse response) {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.integration.gateway.none;
|
||||
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.*;
|
||||
import static org.hamcrest.MatcherAssert.*;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class RecoverAfterNodesTests extends AbstractNodesTests {
|
||||
|
||||
@AfterMethod public void closeNodes() {
|
||||
closeAllNodes();
|
||||
}
|
||||
|
||||
@Test public void testRecoverAfterNodes() {
|
||||
logger.info("--> start node (1)");
|
||||
startNode("node1", settingsBuilder().put("gateway.recover_after_nodes", 3));
|
||||
assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start node (2)");
|
||||
startNode("node2", settingsBuilder().put("gateway.recover_after_nodes", 3));
|
||||
assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
assertThat(client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start node (3)");
|
||||
startNode("node3", settingsBuilder().put("gateway.recover_after_nodes", 3));
|
||||
|
||||
assertThat(client("node1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("node2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("node3").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
}
|
||||
|
||||
@Test public void testRecoverAfterMasterNodes() {
|
||||
logger.info("--> start master_node (1)");
|
||||
startNode("master1", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start data_node (1)");
|
||||
startNode("data1", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start data_node (2)");
|
||||
startNode("data2", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", true).put("node.master", false));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
assertThat(client("data2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start master_node (2)");
|
||||
startNode("master2", settingsBuilder().put("gateway.recover_after_master_nodes", 2).put("node.data", false).put("node.master", true));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("master2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("data2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
}
|
||||
|
||||
@Test public void testRecoverAfterDataNodes() {
|
||||
logger.info("--> start master_node (1)");
|
||||
startNode("master1", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start data_node (1)");
|
||||
startNode("data1", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start master_node (2)");
|
||||
startNode("master2", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", false).put("node.master", true));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
assertThat(client("master2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA),
|
||||
hasItem(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK));
|
||||
|
||||
logger.info("--> start data_node (2)");
|
||||
startNode("data2", settingsBuilder().put("gateway.recover_after_data_nodes", 2).put("node.data", true).put("node.master", false));
|
||||
assertThat(client("master1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("master2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("data1").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
assertThat(client("data2").admin().cluster().prepareState().setLocal(true).execute().actionGet()
|
||||
.state().blocks().global(ClusterBlockLevel.METADATA).isEmpty(),
|
||||
equalTo(true));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue