From 17ddee7011bbdf9743db4b0d33263cabef68e8f7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 14 Sep 2016 20:32:29 +0200 Subject: [PATCH] Remove TransportService#registerRequestHandler leniency (#20469) `TransportService#registerRequestHandler` allowed to register handlers more than once and issues an annoying warn log message when this happens. This change simple throws an exception to prevent regsitering the same handler more than once. This commit also removes the ability to remove request handlers. Relates to #20468 --- .../cluster/node/DiscoveryNode.java | 2 +- .../discovery/zen/ZenDiscovery.java | 2 - .../zen/fd/MasterFaultDetection.java | 1 - .../discovery/zen/fd/NodesFaultDetection.java | 1 - .../zen/membership/MembershipAction.java | 6 -- .../zen/ping/unicast/UnicastZenPing.java | 1 - .../publish/PublishClusterStateAction.java | 5 -- .../VerifyNodeRepositoryAction.java | 4 - .../transport/TransportService.java | 15 +--- .../TransportReplicationActionTests.java | 14 +++- .../discovery/zen/ZenDiscoveryUnitTests.java | 78 +++++++++++-------- .../PublishClusterStateActionTests.java | 1 - .../test/transport/MockTransportService.java | 2 - .../AbstractSimpleTransportTestCase.java | 30 +++---- 14 files changed, 74 insertions(+), 88 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 0c39c43bc9f..55be77d201f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -96,7 +96,7 @@ public class DiscoveryNode implements Writeable, ToXContent { * @param version the version of the node */ public DiscoveryNode(final String id, TransportAddress address, Version version) { - this(id, address, Collections.emptyMap(), Collections.emptySet(), version); + this(id, address, Collections.emptyMap(), EnumSet.allOf(Role.class), version); } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f419da06e68..132505fb403 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -284,8 +284,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover protected void doClose() { masterFD.close(); nodesFD.close(); - publishClusterState.close(); - membership.close(); pingService.close(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 6dc89998046..04aee9db3d8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -168,7 +168,6 @@ public class MasterFaultDetection extends FaultDetection { super.close(); stop("closing"); this.listeners.clear(); - transportService.removeHandler(MASTER_PING_ACTION_NAME); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 0ab5bde25cd..6361d3cde39 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -139,7 +139,6 @@ public class NodesFaultDetection extends FaultDetection { public void close() { super.close(); stop(); - transportService.removeHandler(PING_ACTION_NAME); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index 961b8d79728..8740d12c5f7 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -76,12 +76,6 @@ public class MembershipAction extends AbstractComponent { transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); } - public void close() { - transportService.removeHandler(DISCOVERY_JOIN_ACTION_NAME); - transportService.removeHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME); - transportService.removeHandler(DISCOVERY_LEAVE_ACTION_NAME); - } - public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) { transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 231fa3ee070..afe4902f887 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -211,7 +211,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin @Override protected void doClose() { - transportService.removeHandler(ACTION_NAME); ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); try { IOUtils.close(receivedResponses.values()); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 06c25ebf81a..870e34cc1f3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -107,11 +107,6 @@ public class PublishClusterStateAction extends AbstractComponent { transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler()); } - public void close() { - transportService.removeHandler(SEND_ACTION_NAME); - transportService.removeHandler(COMMIT_ACTION_NAME); - } - public PendingClusterStatesQueue pendingStatesQueue() { return pendingStatesQueue; } diff --git a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index 49edae3ce22..cc1170a4841 100644 --- a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -64,10 +64,6 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SAME, new VerifyNodeRepositoryRequestHandler()); } - public void close() { - transportService.removeHandler(ACTION_NAME); - } - public void verify(String repository, String verificationToken, final ActionListener listener) { final DiscoveryNodes discoNodes = clusterService.state().nodes(); final DiscoveryNode localNode = discoNodes.getLocalNode(); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 1c807553a24..20b8c77d44a 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -620,19 +620,12 @@ public class TransportService extends AbstractLifecycleComponent { registerRequestHandler(reg); } - protected void registerRequestHandler(RequestHandlerRegistry reg) { + private void registerRequestHandler(RequestHandlerRegistry reg) { synchronized (requestHandlerMutex) { - RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction()); - requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); - if (replaced != null) { - logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced); + if (requestHandlers.containsKey(reg.getAction())) { + throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered"); } - } - } - - public void removeHandler(String action) { - synchronized (requestHandlerMutex) { - requestHandlers = MapBuilder.newMapBuilder(requestHandlers).remove(action).immutableMap(); + requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index bda117642a0..f821f82c33a 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -131,6 +131,8 @@ public class TransportReplicationActionTests extends ESTestCase { private TransportService transportService; private CapturingTransport transport; private Action action; + private ShardStateAction shardStateAction; + /* * * TransportReplicationAction needs an instance of IndexShard to count operations. * indexShards is reset to null before each test and will be initialized upon request in the tests. @@ -150,7 +152,8 @@ public class TransportReplicationActionTests extends ESTestCase { transportService = new TransportService(clusterService.getSettings(), transport, threadPool); transportService.start(); transportService.acceptIncomingRequests(); - action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool); + shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); + action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool); } @After @@ -707,7 +710,8 @@ public class TransportReplicationActionTests extends ESTestCase { final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); boolean throwException = randomBoolean(); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) { + Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request) { assertIndexShardCounter(1); @@ -826,7 +830,8 @@ public class TransportReplicationActionTests extends ESTestCase { setState(clusterService, state); AtomicBoolean throwException = new AtomicBoolean(true); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) { + Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request) { assertPhase(task, "replica"); @@ -940,9 +945,10 @@ public class TransportReplicationActionTests extends ESTestCase { Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ShardStateAction shardStateAction, ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, - new ShardStateAction(settings, clusterService, transportService, null, null, threadPool), + shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 235df2d8a35..b9f65016048 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -19,7 +19,9 @@ package org.elasticsearch.discovery.zen; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -35,18 +37,18 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.MockNode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -55,14 +57,11 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; -import static org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.createMockNode; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -/** - */ public class ZenDiscoveryUnitTests extends ESTestCase { public void testShouldIgnoreNewClusterState() { @@ -154,59 +153,76 @@ public class ZenDiscoveryUnitTests extends ESTestCase { Settings settings = Settings.builder() .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build(); - Map nodes = new HashMap<>(); - ZenDiscovery zenDiscovery = null; - ClusterService clusterService = null; + ArrayList toClose = new ArrayList<>(); try { Set expectedFDNodes = null; - // create master node and its mocked up services - MockNode master = createMockNode("master", settings, null, threadPool, logger, nodes).setAsMaster(); - ClusterState state = master.clusterState; // initial cluster state + final MockTransportService masterTransport = MockTransportService.local(settings, Version.CURRENT, threadPool); + masterTransport.start(); + DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT); + toClose.add(masterTransport); + masterTransport.setLocalNode(masterNode); + ClusterState state = ClusterStateCreationUtils.state(masterNode, masterNode, masterNode); // build the zen discovery and cluster service - clusterService = createClusterService(threadPool, master.discoveryNode); - setState(clusterService, state); - zenDiscovery = buildZenDiscovery(settings, master, clusterService, threadPool); + ClusterService masterClusterService = createClusterService(threadPool, masterNode); + toClose.add(masterClusterService); + // TODO: clustername shouldn't be stored twice in cluster service, but for now, work around it + state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build(); + setState(masterClusterService, state); + ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool); + toClose.add(masterZen); + masterTransport.acceptIncomingRequests(); + + final MockTransportService otherTransport = MockTransportService.local(settings, Version.CURRENT, threadPool); + otherTransport.start(); + toClose.add(otherTransport); + DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT); + otherTransport.setLocalNode(otherNode); + final ClusterState otherState = ClusterState.builder(masterClusterService.getClusterName()) + .nodes(DiscoveryNodes.builder().add(otherNode).localNodeId(otherNode.getId())).build(); + ClusterService otherClusterService = createClusterService(threadPool, masterNode); + toClose.add(otherClusterService); + setState(otherClusterService, otherState); + ZenDiscovery otherZen = buildZenDiscovery(settings, otherTransport, otherClusterService, threadPool); + toClose.add(otherZen); + otherTransport.acceptIncomingRequests(); + + + masterTransport.connectToNode(otherNode); + otherTransport.connectToNode(masterNode); // a new cluster state with a new discovery node (we will test if the cluster state // was updated by the presence of this node in NodesFaultDetection) - MockNode newNode = createMockNode("new_node", settings, null, threadPool, logger, nodes); - ClusterState newState = ClusterState.builder(state).incrementVersion().nodes( - DiscoveryNodes.builder(state.nodes()).add(newNode.discoveryNode).masterNodeId(master.discoveryNode.getId()) + ClusterState newState = ClusterState.builder(masterClusterService.state()).incrementVersion().nodes( + DiscoveryNodes.builder(state.nodes()).add(otherNode).masterNodeId(masterNode.getId()) ).build(); try { // publishing a new cluster state ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); - expectedFDNodes = zenDiscovery.getFaultDetectionNodes(); - zenDiscovery.publish(clusterChangedEvent, listener); + expectedFDNodes = masterZen.getFaultDetectionNodes(); + masterZen.publish(clusterChangedEvent, listener); listener.await(1, TimeUnit.HOURS); // publish was a success, update expected FD nodes based on new cluster state - expectedFDNodes = fdNodesForState(newState, master.discoveryNode); + expectedFDNodes = fdNodesForState(newState, masterNode); } catch (Discovery.FailedToCommitClusterStateException e) { // not successful, so expectedFDNodes above should remain what it was originally assigned assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail } - assertEquals(expectedFDNodes, zenDiscovery.getFaultDetectionNodes()); + assertEquals(expectedFDNodes, masterZen.getFaultDetectionNodes()); } finally { - // clean close of transport service and publish action for each node - zenDiscovery.close(); - clusterService.close(); - for (MockNode curNode : nodes.values()) { - curNode.action.close(); - curNode.service.close(); - } + IOUtils.close(toClose); terminate(threadPool); } } - private ZenDiscovery buildZenDiscovery(Settings settings, MockNode master, ClusterService clusterService, ThreadPool threadPool) { + private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) { ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet()); ElectMasterService electMasterService = new ElectMasterService(settings); - ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, master.service, clusterService, + ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, clusterSettings, zenPingService, electMasterService); zenDiscovery.start(); return zenDiscovery; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index e3931a6da75..50ec06694fe 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -223,7 +223,6 @@ public class PublishClusterStateActionTests extends ESTestCase { public void tearDown() throws Exception { super.tearDown(); for (MockNode curNode : nodes.values()) { - curNode.action.close(); curNode.service.close(); } terminate(threadPool); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 9c4612b22b3..90ac8ed78ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -642,6 +642,4 @@ public class MockTransportService extends TransportService { } } } - - } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 891c3a402b6..ba831dde092 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -224,8 +224,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); } - - serviceA.removeHandler("sayHello"); } public void testThreadContext() throws ExecutionException, InterruptedException { @@ -281,8 +279,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertEquals("ping_user", threadPool.getThreadContext().getHeader("test.ping.user")); assertSame(context, threadPool.getThreadContext().getTransient("my_private_context")); assertNull("this header is only visible in the handler context", threadPool.getThreadContext().getHeader("some.temp.header")); - - serviceA.removeHandler("sayHello"); } public void testLocalNodeConnection() throws InterruptedException { @@ -375,8 +371,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); } - - serviceA.removeHandler("sayHello"); } public void testHelloWorldCompressed() { @@ -426,8 +420,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); } - - serviceA.removeHandler("sayHello"); } public void testErrorMessage() { @@ -469,8 +461,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getCause().getMessage(), equalTo("runtime_exception: bad message !!!")); } - - serviceA.removeHandler("sayHelloException"); } public void testDisconnectListener() throws Exception { @@ -635,7 +625,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (TransportException ex) { } - serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); } public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception { @@ -678,8 +667,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e, instanceOf(ReceiveTimeoutTransportException.class)); } - - serviceA.removeHandler("sayHelloTimeoutNoResponse"); } public void testTimeoutSendExceptionWithDelayedResponse() throws Exception { @@ -785,7 +772,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { for (Runnable runnable : assertions) { runnable.run(); } - serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); waitForever.countDown(); doneWaitingForever.await(); assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); @@ -1325,8 +1311,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (ConnectTransportException e) { // all is well } - - serviceA.removeHandler("sayHello"); } public void testMockUnresponsiveRule() { @@ -1385,8 +1369,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (ConnectTransportException e) { // all is well } - - serviceA.removeHandler("sayHello"); } @@ -1721,4 +1703,16 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceC.close(); } + + public void testRegisterHandlerTwice() { + serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + (request, message) -> {throw new AssertionError("boom");}); + expectThrows(IllegalArgumentException.class, () -> + serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + (request, message) -> {throw new AssertionError("boom");}) + ); + + serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + (request, message) -> {throw new AssertionError("boom");}); + } }