diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 0c39c43bc9f..55be77d201f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -96,7 +96,7 @@ public class DiscoveryNode implements Writeable, ToXContent { * @param version the version of the node */ public DiscoveryNode(final String id, TransportAddress address, Version version) { - this(id, address, Collections.emptyMap(), Collections.emptySet(), version); + this(id, address, Collections.emptyMap(), EnumSet.allOf(Role.class), version); } /** diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f419da06e68..132505fb403 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -284,8 +284,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover protected void doClose() { masterFD.close(); nodesFD.close(); - publishClusterState.close(); - membership.close(); pingService.close(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 6dc89998046..04aee9db3d8 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -168,7 +168,6 @@ public class MasterFaultDetection extends FaultDetection { super.close(); stop("closing"); this.listeners.clear(); - transportService.removeHandler(MASTER_PING_ACTION_NAME); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 0ab5bde25cd..6361d3cde39 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -139,7 +139,6 @@ public class NodesFaultDetection extends FaultDetection { public void close() { super.close(); stop(); - transportService.removeHandler(PING_ACTION_NAME); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java index 961b8d79728..8740d12c5f7 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java @@ -76,12 +76,6 @@ public class MembershipAction extends AbstractComponent { transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler()); } - public void close() { - transportService.removeHandler(DISCOVERY_JOIN_ACTION_NAME); - transportService.removeHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME); - transportService.removeHandler(DISCOVERY_LEAVE_ACTION_NAME); - } - public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) { transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 231fa3ee070..afe4902f887 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -211,7 +211,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin @Override protected void doClose() { - transportService.removeHandler(ACTION_NAME); ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS); try { IOUtils.close(receivedResponses.values()); diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 06c25ebf81a..870e34cc1f3 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -107,11 +107,6 @@ public class PublishClusterStateAction extends AbstractComponent { transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler()); } - public void close() { - transportService.removeHandler(SEND_ACTION_NAME); - transportService.removeHandler(COMMIT_ACTION_NAME); - } - public PendingClusterStatesQueue pendingStatesQueue() { return pendingStatesQueue; } diff --git a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index 49edae3ce22..cc1170a4841 100644 --- a/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/core/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -64,10 +64,6 @@ public class VerifyNodeRepositoryAction extends AbstractComponent { transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SAME, new VerifyNodeRepositoryRequestHandler()); } - public void close() { - transportService.removeHandler(ACTION_NAME); - } - public void verify(String repository, String verificationToken, final ActionListener listener) { final DiscoveryNodes discoNodes = clusterService.state().nodes(); final DiscoveryNode localNode = discoNodes.getLocalNode(); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 1c807553a24..20b8c77d44a 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -620,19 +620,12 @@ public class TransportService extends AbstractLifecycleComponent { registerRequestHandler(reg); } - protected void registerRequestHandler(RequestHandlerRegistry reg) { + private void registerRequestHandler(RequestHandlerRegistry reg) { synchronized (requestHandlerMutex) { - RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction()); - requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); - if (replaced != null) { - logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced); + if (requestHandlers.containsKey(reg.getAction())) { + throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered"); } - } - } - - public void removeHandler(String action) { - synchronized (requestHandlerMutex) { - requestHandlers = MapBuilder.newMapBuilder(requestHandlers).remove(action).immutableMap(); + requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap(); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index bda117642a0..f821f82c33a 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -131,6 +131,8 @@ public class TransportReplicationActionTests extends ESTestCase { private TransportService transportService; private CapturingTransport transport; private Action action; + private ShardStateAction shardStateAction; + /* * * TransportReplicationAction needs an instance of IndexShard to count operations. * indexShards is reset to null before each test and will be initialized upon request in the tests. @@ -150,7 +152,8 @@ public class TransportReplicationActionTests extends ESTestCase { transportService = new TransportService(clusterService.getSettings(), transport, threadPool); transportService.start(); transportService.acceptIncomingRequests(); - action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool); + shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool); + action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool); } @After @@ -707,7 +710,8 @@ public class TransportReplicationActionTests extends ESTestCase { final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); boolean throwException = randomBoolean(); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) { + Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request) { assertIndexShardCounter(1); @@ -826,7 +830,8 @@ public class TransportReplicationActionTests extends ESTestCase { setState(clusterService, state); AtomicBoolean throwException = new AtomicBoolean(true); final ReplicationTask task = maybeTask(); - Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) { + Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + threadPool) { @Override protected ReplicaResult shardOperationOnReplica(Request request) { assertPhase(task, "replica"); @@ -940,9 +945,10 @@ public class TransportReplicationActionTests extends ESTestCase { Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ShardStateAction shardStateAction, ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, - new ShardStateAction(settings, clusterService, transportService, null, null, threadPool), + shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 235df2d8a35..b9f65016048 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -19,7 +19,9 @@ package org.elasticsearch.discovery.zen; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -35,18 +37,18 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener; -import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.MockNode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -55,14 +57,11 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; -import static org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.createMockNode; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -/** - */ public class ZenDiscoveryUnitTests extends ESTestCase { public void testShouldIgnoreNewClusterState() { @@ -154,59 +153,76 @@ public class ZenDiscoveryUnitTests extends ESTestCase { Settings settings = Settings.builder() .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build(); - Map nodes = new HashMap<>(); - ZenDiscovery zenDiscovery = null; - ClusterService clusterService = null; + ArrayList toClose = new ArrayList<>(); try { Set expectedFDNodes = null; - // create master node and its mocked up services - MockNode master = createMockNode("master", settings, null, threadPool, logger, nodes).setAsMaster(); - ClusterState state = master.clusterState; // initial cluster state + final MockTransportService masterTransport = MockTransportService.local(settings, Version.CURRENT, threadPool); + masterTransport.start(); + DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT); + toClose.add(masterTransport); + masterTransport.setLocalNode(masterNode); + ClusterState state = ClusterStateCreationUtils.state(masterNode, masterNode, masterNode); // build the zen discovery and cluster service - clusterService = createClusterService(threadPool, master.discoveryNode); - setState(clusterService, state); - zenDiscovery = buildZenDiscovery(settings, master, clusterService, threadPool); + ClusterService masterClusterService = createClusterService(threadPool, masterNode); + toClose.add(masterClusterService); + // TODO: clustername shouldn't be stored twice in cluster service, but for now, work around it + state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build(); + setState(masterClusterService, state); + ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool); + toClose.add(masterZen); + masterTransport.acceptIncomingRequests(); + + final MockTransportService otherTransport = MockTransportService.local(settings, Version.CURRENT, threadPool); + otherTransport.start(); + toClose.add(otherTransport); + DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT); + otherTransport.setLocalNode(otherNode); + final ClusterState otherState = ClusterState.builder(masterClusterService.getClusterName()) + .nodes(DiscoveryNodes.builder().add(otherNode).localNodeId(otherNode.getId())).build(); + ClusterService otherClusterService = createClusterService(threadPool, masterNode); + toClose.add(otherClusterService); + setState(otherClusterService, otherState); + ZenDiscovery otherZen = buildZenDiscovery(settings, otherTransport, otherClusterService, threadPool); + toClose.add(otherZen); + otherTransport.acceptIncomingRequests(); + + + masterTransport.connectToNode(otherNode); + otherTransport.connectToNode(masterNode); // a new cluster state with a new discovery node (we will test if the cluster state // was updated by the presence of this node in NodesFaultDetection) - MockNode newNode = createMockNode("new_node", settings, null, threadPool, logger, nodes); - ClusterState newState = ClusterState.builder(state).incrementVersion().nodes( - DiscoveryNodes.builder(state.nodes()).add(newNode.discoveryNode).masterNodeId(master.discoveryNode.getId()) + ClusterState newState = ClusterState.builder(masterClusterService.state()).incrementVersion().nodes( + DiscoveryNodes.builder(state.nodes()).add(otherNode).masterNodeId(masterNode.getId()) ).build(); try { // publishing a new cluster state ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); - expectedFDNodes = zenDiscovery.getFaultDetectionNodes(); - zenDiscovery.publish(clusterChangedEvent, listener); + expectedFDNodes = masterZen.getFaultDetectionNodes(); + masterZen.publish(clusterChangedEvent, listener); listener.await(1, TimeUnit.HOURS); // publish was a success, update expected FD nodes based on new cluster state - expectedFDNodes = fdNodesForState(newState, master.discoveryNode); + expectedFDNodes = fdNodesForState(newState, masterNode); } catch (Discovery.FailedToCommitClusterStateException e) { // not successful, so expectedFDNodes above should remain what it was originally assigned assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail } - assertEquals(expectedFDNodes, zenDiscovery.getFaultDetectionNodes()); + assertEquals(expectedFDNodes, masterZen.getFaultDetectionNodes()); } finally { - // clean close of transport service and publish action for each node - zenDiscovery.close(); - clusterService.close(); - for (MockNode curNode : nodes.values()) { - curNode.action.close(); - curNode.service.close(); - } + IOUtils.close(toClose); terminate(threadPool); } } - private ZenDiscovery buildZenDiscovery(Settings settings, MockNode master, ClusterService clusterService, ThreadPool threadPool) { + private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) { ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet()); ElectMasterService electMasterService = new ElectMasterService(settings); - ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, master.service, clusterService, + ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, clusterSettings, zenPingService, electMasterService); zenDiscovery.start(); return zenDiscovery; diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index e3931a6da75..50ec06694fe 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -223,7 +223,6 @@ public class PublishClusterStateActionTests extends ESTestCase { public void tearDown() throws Exception { super.tearDown(); for (MockNode curNode : nodes.values()) { - curNode.action.close(); curNode.service.close(); } terminate(threadPool); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 9c4612b22b3..90ac8ed78ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -642,6 +642,4 @@ public class MockTransportService extends TransportService { } } } - - } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 891c3a402b6..ba831dde092 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -224,8 +224,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); } - - serviceA.removeHandler("sayHello"); } public void testThreadContext() throws ExecutionException, InterruptedException { @@ -281,8 +279,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertEquals("ping_user", threadPool.getThreadContext().getHeader("test.ping.user")); assertSame(context, threadPool.getThreadContext().getTransient("my_private_context")); assertNull("this header is only visible in the handler context", threadPool.getThreadContext().getHeader("some.temp.header")); - - serviceA.removeHandler("sayHello"); } public void testLocalNodeConnection() throws InterruptedException { @@ -375,8 +371,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); } - - serviceA.removeHandler("sayHello"); } public void testHelloWorldCompressed() { @@ -426,8 +420,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getMessage(), false, equalTo(true)); } - - serviceA.removeHandler("sayHello"); } public void testErrorMessage() { @@ -469,8 +461,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e.getCause().getMessage(), equalTo("runtime_exception: bad message !!!")); } - - serviceA.removeHandler("sayHelloException"); } public void testDisconnectListener() throws Exception { @@ -635,7 +625,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (TransportException ex) { } - serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); } public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception { @@ -678,8 +667,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (Exception e) { assertThat(e, instanceOf(ReceiveTimeoutTransportException.class)); } - - serviceA.removeHandler("sayHelloTimeoutNoResponse"); } public void testTimeoutSendExceptionWithDelayedResponse() throws Exception { @@ -785,7 +772,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { for (Runnable runnable : assertions) { runnable.run(); } - serviceA.removeHandler("sayHelloTimeoutDelayedResponse"); waitForever.countDown(); doneWaitingForever.await(); assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS)); @@ -1325,8 +1311,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (ConnectTransportException e) { // all is well } - - serviceA.removeHandler("sayHello"); } public void testMockUnresponsiveRule() { @@ -1385,8 +1369,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } catch (ConnectTransportException e) { // all is well } - - serviceA.removeHandler("sayHello"); } @@ -1721,4 +1703,16 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceC.close(); } + + public void testRegisterHandlerTwice() { + serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + (request, message) -> {throw new AssertionError("boom");}); + expectThrows(IllegalArgumentException.class, () -> + serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + (request, message) -> {throw new AssertionError("boom");}) + ); + + serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + (request, message) -> {throw new AssertionError("boom");}); + } }