From cc4cae3ba04dde6f4fa739d5f9c61e5bea196837 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 31 Mar 2014 22:13:57 +0200 Subject: [PATCH] Mock Transport: Allow to simulate network failures An infrastructure that allows to simulate different network topologies failures, including 2 basic ones in failure to send requests, and unresponsive nodes closes #5631 --- .../transport/TransportModule.java | 8 +- .../transport/TransportService.java | 5 +- .../org/elasticsearch/test/TestCluster.java | 9 + .../test/transport/MockTransportService.java | 247 ++++++++++++++++++ .../AbstractSimpleTransportTests.java | 145 +++++++++- .../local/SimpleLocalTransportTests.java | 8 +- .../netty/SimpleNettyTransportTests.java | 8 +- 7 files changed, 418 insertions(+), 12 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/transport/MockTransportService.java diff --git a/src/main/java/org/elasticsearch/transport/TransportModule.java b/src/main/java/org/elasticsearch/transport/TransportModule.java index b4a7b69d60b..1a8942bd861 100644 --- a/src/main/java/org/elasticsearch/transport/TransportModule.java +++ b/src/main/java/org/elasticsearch/transport/TransportModule.java @@ -37,6 +37,7 @@ public class TransportModule extends AbstractModule implements SpawnModules { private final Settings settings; public static final String TRANSPORT_TYPE_KEY = "transport.type"; + public static final String TRANSPORT_SERVICE_TYPE_KEY = "transport.service.type"; public TransportModule(Settings settings) { this.settings = settings; @@ -55,6 +56,11 @@ public class TransportModule extends AbstractModule implements SpawnModules { @Override protected void configure() { - bind(TransportService.class).asEagerSingleton(); + Class transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService"); + if (!TransportService.class.equals(transportService)) { + bind(TransportService.class).to(transportService).asEagerSingleton(); + } else { + bind(TransportService.class).asEagerSingleton(); + } } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index 998778a48bc..a99336f903d 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -49,9 +49,8 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_ */ public class TransportService extends AbstractLifecycleComponent { - private final Transport transport; - - private final ThreadPool threadPool; + protected final Transport transport; + protected final ThreadPool threadPool; volatile ImmutableMap serverHandlers = ImmutableMap.of(); final Object serverHandlersMutex = new Object(); diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index c5eb84a1161..5615b515c88 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -59,6 +59,7 @@ import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule; import org.elasticsearch.test.engine.MockEngineModule; import org.elasticsearch.test.store.MockFSIndexStoreModule; import org.elasticsearch.test.transport.AssertingLocalTransportModule; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportModule; @@ -233,6 +234,7 @@ public final class TestCluster extends ImmutableTestCluster { builder.put(IndexEngineModule.EngineSettings.ENGINE_TYPE, MockEngineModule.class.getName()); builder.put(PageCacheRecyclerModule.CACHE_IMPL, MockPageCacheRecyclerModule.class.getName()); builder.put(BigArraysModule.IMPL, MockBigArraysModule.class.getName()); + builder.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()); } if (isLocalTransportConfigured()) { builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransportModule.class.getName()); @@ -667,6 +669,13 @@ public final class TestCluster extends ImmutableTestCluster { if (wipeData) { wipeDataDirectories(); } + // clear all rules for mock transport services + for (NodeAndClient nodeAndClient : nodes.values()) { + TransportService transportService = nodeAndClient.node.injector().getInstance(TransportService.class); + if (transportService instanceof MockTransportService) { + ((MockTransportService) transportService).clearAllRules(); + } + } if (nextNodeId.get() == sharedNodesSeeds.length && nodes.size() == sharedNodesSeeds.length) { logger.debug("Cluster hasn't changed - moving out - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), sharedNodesSeeds.length); return; diff --git a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java new file mode 100644 index 00000000000..1c6d7a39ac1 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -0,0 +1,247 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleListener; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; + +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + +/** + * A mock transport service that allows to simulate different network topology failures. + */ +public class MockTransportService extends TransportService { + + @Inject + public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool) { + super(settings, new LookupTestTransport(transport), threadPool); + } + + /** + * Clears all the registered rules. + */ + public void clearAllRules() { + ((LookupTestTransport) transport).transports.clear(); + } + + /** + * Clears the rule associated with the provided node. + */ + public void clearRule(DiscoveryNode node) { + ((LookupTestTransport) transport).transports.remove(node); + } + + /** + * Adds a rule that will cause every send request to fail, and each new connect since the rule + * is added to fail as well. + */ + public void addFailToSendNoConnectRule(DiscoveryNode node) { + ((LookupTestTransport) transport).transports.put(node, new DelegateTransport(transport) { + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + throw new ConnectTransportException(node, "DISCONNECT: simulated"); + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + throw new ConnectTransportException(node, "DISCONNECT: simulated"); + } + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + throw new ConnectTransportException(node, "DISCONNECT: simulated"); + } + }); + } + + /** + * Adds a rule that will cause ignores each send request, simulating an unresponsive node + * and failing to connect once the rule was added. + */ + public void addUnresponsiveRule(DiscoveryNode node) { + // TODO add a parameter to delay the connect timeout? + ((LookupTestTransport) transport).transports.put(node, new DelegateTransport(transport) { + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + throw new ConnectTransportException(node, "UNRESPONSIVE: simulated"); + } + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + // don't send anything, the receiving node is unresponsive + } + }); + } + + /** + * A lookup transport that has a list of potential Transport implementations to delegate to for node operations, + * if none is registered, then the default one is used. + */ + private static class LookupTestTransport extends DelegateTransport { + + final ConcurrentMap transports = ConcurrentCollections.newConcurrentMap(); + + LookupTestTransport(Transport transport) { + super(transport); + } + + private Transport getTransport(DiscoveryNode node) { + Transport transport = transports.get(node); + if (transport != null) { + return transport; + } + // TODO, if we miss on node by UID, we should have an option to lookup based on address? + return this.transport; + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return getTransport(node).nodeConnected(node); + } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + getTransport(node).connectToNode(node); + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + getTransport(node).connectToNodeLight(node); + } + + @Override + public void disconnectFromNode(DiscoveryNode node) { + getTransport(node).disconnectFromNode(node); + } + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + getTransport(node).sendRequest(node, requestId, action, request, options); + } + } + + /** + * A pure delegate transport. + * Can be extracted to a common class if needed in other places in the codebase. + */ + private static class DelegateTransport implements Transport { + + protected final Transport transport; + + DelegateTransport(Transport transport) { + this.transport = transport; + } + + @Override + public void transportServiceAdapter(TransportServiceAdapter service) { + transport.transportServiceAdapter(service); + } + + @Override + public BoundTransportAddress boundAddress() { + return transport.boundAddress(); + } + + @Override + public TransportAddress[] addressesFromString(String address) throws Exception { + return transport.addressesFromString(address); + } + + @Override + public boolean addressSupported(Class address) { + return transport.addressSupported(address); + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return transport.nodeConnected(node); + } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + transport.connectToNode(node); + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + transport.connectToNodeLight(node); + } + + @Override + public void disconnectFromNode(DiscoveryNode node) { + transport.disconnectFromNode(node); + } + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + transport.sendRequest(node, requestId, action, request, options); + } + + @Override + public long serverOpen() { + return transport.serverOpen(); + } + + @Override + public Lifecycle.State lifecycleState() { + return transport.lifecycleState(); + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + transport.addLifecycleListener(listener); + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + transport.removeLifecycleListener(listener); + } + + @Override + public Transport start() throws ElasticsearchException { + return transport.start(); + } + + @Override + public Transport stop() throws ElasticsearchException { + return transport.stop(); + } + + @Override + public void close() throws ElasticsearchException { + transport.close(); + } + } +} diff --git a/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java b/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java index ee4dc162227..5126dbd4445 100644 --- a/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java +++ b/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; @@ -50,13 +51,13 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase protected static final Version version0 = Version.fromId(/*0*/99); protected DiscoveryNode nodeA; - protected TransportService serviceA; + protected MockTransportService serviceA; protected static final Version version1 = Version.fromId(199); protected DiscoveryNode nodeB; - protected TransportService serviceB; + protected MockTransportService serviceB; - protected abstract TransportService build(Settings settings, Version version); + protected abstract MockTransportService build(Settings settings, Version version); @Before public void setUp() throws Exception { @@ -872,4 +873,142 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase assertThat(version0Response.value1, equalTo(1)); } + + @Test + public void testMockFailToSendNoConnectRule() { + serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { + @Override + public StringMessageRequest newInstance() { + return new StringMessageRequest(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); + } + }); + + serviceB.addFailToSendNoConnectRule(nodeA); + + TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", + new StringMessageRequest("moshe"), new BaseTransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void handleResponse(StringMessageResponse response) { + assertThat("got response instead of exception", false, equalTo(true)); + } + + @Override + public void handleException(TransportException exp) { + assertThat(exp.getCause().getMessage(), endsWith("DISCONNECT: simulated")); + } + }); + + try { + res.txGet(); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (Exception e) { + assertThat(e.getCause().getMessage(), endsWith("DISCONNECT: simulated")); + } + + try { + serviceB.connectToNode(nodeA); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (ConnectTransportException e) { + // all is well + } + + try { + serviceB.connectToNodeLight(nodeA); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (ConnectTransportException e) { + // all is well + } + + serviceA.removeHandler("sayHello"); + } + + @Test + public void testMockUnresponsiveRule() { + serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { + @Override + public StringMessageRequest newInstance() { + return new StringMessageRequest(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); + } + }); + + serviceB.addUnresponsiveRule(nodeA); + + TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", + new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void handleResponse(StringMessageResponse response) { + assertThat("got response instead of exception", false, equalTo(true)); + } + + @Override + public void handleException(TransportException exp) { + assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); + } + }); + + try { + res.txGet(); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (Exception e) { + assertThat(e, instanceOf(ReceiveTimeoutTransportException.class)); + } + + try { + serviceB.connectToNode(nodeA); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (ConnectTransportException e) { + // all is well + } + + try { + serviceB.connectToNodeLight(nodeA); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (ConnectTransportException e) { + // all is well + } + + serviceA.removeHandler("sayHello"); + } } diff --git a/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java index 2b6b42e3dbf..14721a94b3c 100644 --- a/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java +++ b/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -20,14 +20,18 @@ package org.elasticsearch.transport.local; import org.elasticsearch.Version; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.AbstractSimpleTransportTests; import org.elasticsearch.transport.TransportService; public class SimpleLocalTransportTests extends AbstractSimpleTransportTests { @Override - protected TransportService build(Settings settings, Version version) { - return new TransportService(new LocalTransport(settings, threadPool, version), threadPool).start(); + protected MockTransportService build(Settings settings, Version version) { + MockTransportService transportService = new MockTransportService(ImmutableSettings.EMPTY, new LocalTransport(settings, threadPool, version), threadPool); + transportService.start(); + return transportService; } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index dbd3bd80fd4..17db1009478 100644 --- a/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -25,19 +25,21 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.AbstractSimpleTransportTests; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.TransportService; import org.junit.Test; public class SimpleNettyTransportTests extends AbstractSimpleTransportTests { @Override - protected TransportService build(Settings settings, Version version) { + protected MockTransportService build(Settings settings, Version version) { int startPort = 11000 + randomIntBetween(0, 255); int endPort = startPort + 10; settings = ImmutableSettings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build(); - return new TransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool).start(); + MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool); + transportService.start(); + return transportService; } @Test