Merge pull request #13998 from ywelsch/tests/transportmasternodeaction

Tests for TransportMasterNodeAction
This commit is contained in:
Yannick Welsch 2015-10-13 05:48:40 -07:00
commit a550ebf381
2 changed files with 354 additions and 0 deletions

View File

@ -0,0 +1,331 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.master;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportService;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class TransportMasterNodeActionTests extends ESTestCase {
private static ThreadPool threadPool;
private TestClusterService clusterService;
private TransportService transportService;
private CapturingTransport transport;
private DiscoveryNode localNode;
private DiscoveryNode remoteNode;
private DiscoveryNode[] allNodes;
@BeforeClass
public static void beforeClass() {
threadPool = new ThreadPool("TransportMasterNodeActionTests");
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
transport = new CapturingTransport();
clusterService = new TestClusterService(threadPool);
transportService = new TransportService(transport, threadPool);
transportService.start();
localNode = new DiscoveryNode("local_node", DummyTransportAddress.INSTANCE, Version.CURRENT);
remoteNode = new DiscoveryNode("remote_node", DummyTransportAddress.INSTANCE, Version.CURRENT);
allNodes = new DiscoveryNode[] { localNode, remoteNode };
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
<T> void assertListenerThrows(String msg, ActionFuture<?> listener, Class<?> klass) throws InterruptedException {
try {
listener.get();
fail(msg);
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(klass));
}
}
public static class Request extends MasterNodeRequest<Request> {
@Override
public ActionRequestValidationException validate() {
return null;
}
}
class Response extends ActionResponse {}
class Action extends TransportMasterNodeAction<Request, Response> {
Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, threadPool,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new);
}
@Override
protected void doExecute(final Request request, ActionListener<Response> listener) {
// remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER
super.doExecute(request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener));
}
@Override
protected String executor() {
// very lightweight operation in memory, no need to fork to a thread
return ThreadPool.Names.SAME;
}
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
listener.onResponse(new Response()); // default implementation, overridden in specific tests
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return null; // default implementation, overridden in specific tests
}
}
@Test
public void testLocalOperationWithoutBlocks() throws ExecutionException, InterruptedException {
final boolean masterOperationFailure = randomBoolean();
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final Throwable exception = new Throwable();
final Response response = new Response();
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
if (masterOperationFailure) {
listener.onFailure(exception);
} else {
listener.onResponse(response);
}
}
}.execute(request, listener);
assertTrue(listener.isDone());
if (masterOperationFailure) {
try {
listener.get();
fail("Expected exception but returned proper result");
} catch (ExecutionException ex) {
assertThat(ex.getCause(), equalTo(exception));
}
} else {
assertThat(listener.get(), equalTo(response));
}
}
@Test
public void testLocalOperationWithBlocks() throws ExecutionException, InterruptedException {
final boolean retryableBlock = randomBoolean();
final boolean unblockBeforeTimeout = randomBoolean();
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(unblockBeforeTimeout ? 60 : 0));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
ClusterBlock block = new ClusterBlock(1, "", retryableBlock, true,
randomFrom(RestStatus.values()), ClusterBlockLevel.ALL);
ClusterState stateWithBlock = ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.builder().addGlobalBlock(block)).build();
clusterService.setState(stateWithBlock);
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
Set<ClusterBlock> blocks = state.blocks().global();
return blocks.isEmpty() ? null : new ClusterBlockException(blocks);
}
}.execute(request, listener);
if (retryableBlock && unblockBeforeTimeout) {
assertFalse(listener.isDone());
clusterService.setState(ClusterState.builder(ClusterStateCreationUtils.state(localNode, localNode, allNodes))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).build());
assertTrue(listener.isDone());
listener.get();
return;
}
assertTrue(listener.isDone());
assertListenerThrows("ClusterBlockException should be thrown", listener, ClusterBlockException.class);
}
@Test
public void testForceLocalOperation() throws ExecutionException, InterruptedException {
Request request = new Request();
PlainActionFuture<Response> listener = new PlainActionFuture<>();
clusterService.setState(ClusterStateCreationUtils.state(localNode, randomFrom(null, localNode, remoteNode), allNodes));
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected boolean localExecute(Request request) {
return true;
}
}.execute(request, listener);
assertTrue(listener.isDone());
listener.get();
}
@Test
public void testMasterNotAvailable() throws ExecutionException, InterruptedException {
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(0));
clusterService.setState(ClusterStateCreationUtils.state(localNode, null, allNodes));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
assertTrue(listener.isDone());
assertListenerThrows("MasterNotDiscoveredException should be thrown", listener, MasterNotDiscoveredException.class);
}
@Test
public void testMasterBecomesAvailable() throws ExecutionException, InterruptedException {
Request request = new Request();
clusterService.setState(ClusterStateCreationUtils.state(localNode, null, allNodes));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool).execute(request, listener);
assertFalse(listener.isDone());
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
assertTrue(listener.isDone());
listener.get();
}
@Test
public void testDelegateToMaster() throws ExecutionException, InterruptedException {
Request request = new Request();
clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final AtomicBoolean delegationToMaster = new AtomicBoolean();
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
logger.debug("Delegation to master called");
delegationToMaster.set(true);
}
}.execute(request, listener);
assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get());
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
assertTrue(capturedRequest.node.isMasterNode());
assertThat(capturedRequest.request, equalTo(request));
assertThat(capturedRequest.action, equalTo("testAction"));
Response response = new Response();
transport.handleResponse(capturedRequest.requestId, response);
assertTrue(listener.isDone());
assertThat(listener.get(), equalTo(response));
}
@Test
public void testDelegateToFailingMaster() throws ExecutionException, InterruptedException {
boolean failsWithConnectTransportException = randomBoolean();
Request request = new Request().masterNodeTimeout(TimeValue.timeValueSeconds(failsWithConnectTransportException ? 60 : 0));
clusterService.setState(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes));
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final AtomicBoolean delegationToMaster = new AtomicBoolean();
new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) {
@Override
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
logger.debug("Delegation to master called");
delegationToMaster.set(true);
}
}.execute(request, listener);
assertTrue("processBeforeDelegationToMaster not called", delegationToMaster.get());
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
assertTrue(capturedRequest.node.isMasterNode());
assertThat(capturedRequest.request, equalTo(request));
assertThat(capturedRequest.action, equalTo("testAction"));
if (failsWithConnectTransportException) {
transport.handleResponse(capturedRequest.requestId, new ConnectTransportException(remoteNode, "Fake error"));
assertFalse(listener.isDone());
clusterService.setState(ClusterStateCreationUtils.state(localNode, localNode, allNodes));
assertTrue(listener.isDone());
listener.get();
} else {
Throwable t = new Throwable();
transport.handleResponse(capturedRequest.requestId, t);
assertTrue(listener.isDone());
try {
listener.get();
fail("Expected exception but returned proper result");
} catch (ExecutionException ex) {
assertThat(ex.getCause().getCause(), equalTo(t));
}
}
}
}

View File

@ -225,6 +225,29 @@ public class ClusterStateCreationUtils {
return state.build();
}
/**
* Creates a cluster state where local node and master node can be specified
* @param localNode node in allNodes that is the local node
* @param masterNode node in allNodes that is the master node. Can be null if no master exists
* @param allNodes all nodes in the cluster
* @return cluster state
*/
public static ClusterState state(DiscoveryNode localNode, DiscoveryNode masterNode, DiscoveryNode... allNodes) {
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
for (DiscoveryNode node : allNodes) {
discoBuilder.put(node);
}
if (masterNode != null) {
discoBuilder.masterNodeId(masterNode.id());
}
discoBuilder.localNodeId(localNode.id());
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
return state.build();
}
private static DiscoveryNode newNode(int nodeId) {
return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT);
}