Remove TransportService#registerRequestHandler leniency (#20469)

`TransportService#registerRequestHandler` allowed to register
handlers more than once and issues an annoying warn log message when
this happens. This change simple throws an exception to prevent regsitering
the same handler more than once. This commit also removes the ability
to remove request handlers.

Relates to #20468
This commit is contained in:
Simon Willnauer 2016-09-14 20:32:29 +02:00 committed by GitHub
parent 960efe6202
commit 17ddee7011
14 changed files with 74 additions and 88 deletions

View File

@ -96,7 +96,7 @@ public class DiscoveryNode implements Writeable, ToXContent {
* @param version the version of the node
*/
public DiscoveryNode(final String id, TransportAddress address, Version version) {
this(id, address, Collections.emptyMap(), Collections.emptySet(), version);
this(id, address, Collections.emptyMap(), EnumSet.allOf(Role.class), version);
}
/**

View File

@ -284,8 +284,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
protected void doClose() {
masterFD.close();
nodesFD.close();
publishClusterState.close();
membership.close();
pingService.close();
}

View File

@ -168,7 +168,6 @@ public class MasterFaultDetection extends FaultDetection {
super.close();
stop("closing");
this.listeners.clear();
transportService.removeHandler(MASTER_PING_ACTION_NAME);
}
@Override

View File

@ -139,7 +139,6 @@ public class NodesFaultDetection extends FaultDetection {
public void close() {
super.close();
stop();
transportService.removeHandler(PING_ACTION_NAME);
}
@Override

View File

@ -76,12 +76,6 @@ public class MembershipAction extends AbstractComponent {
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new, ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
public void close() {
transportService.removeHandler(DISCOVERY_JOIN_ACTION_NAME);
transportService.removeHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME);
transportService.removeHandler(DISCOVERY_LEAVE_ACTION_NAME);
}
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(node, DISCOVERY_LEAVE_ACTION_NAME, new LeaveRequest(masterNode), EmptyTransportResponseHandler.INSTANCE_SAME);
}

View File

@ -211,7 +211,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implements ZenPin
@Override
protected void doClose() {
transportService.removeHandler(ACTION_NAME);
ThreadPool.terminate(unicastConnectExecutor, 0, TimeUnit.SECONDS);
try {
IOUtils.close(receivedResponses.values());

View File

@ -107,11 +107,6 @@ public class PublishClusterStateAction extends AbstractComponent {
transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, new CommitClusterStateRequestHandler());
}
public void close() {
transportService.removeHandler(SEND_ACTION_NAME);
transportService.removeHandler(COMMIT_ACTION_NAME);
}
public PendingClusterStatesQueue pendingStatesQueue() {
return pendingStatesQueue;
}

View File

@ -64,10 +64,6 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
transportService.registerRequestHandler(ACTION_NAME, VerifyNodeRepositoryRequest::new, ThreadPool.Names.SAME, new VerifyNodeRepositoryRequestHandler());
}
public void close() {
transportService.removeHandler(ACTION_NAME);
}
public void verify(String repository, String verificationToken, final ActionListener<VerifyResponse> listener) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.getLocalNode();

View File

@ -620,19 +620,12 @@ public class TransportService extends AbstractLifecycleComponent {
registerRequestHandler(reg);
}
protected <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
private <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
synchronized (requestHandlerMutex) {
RequestHandlerRegistry replaced = requestHandlers.get(reg.getAction());
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
if (replaced != null) {
logger.warn("registered two transport handlers for action {}, handlers: {}, {}", reg.getAction(), reg, replaced);
if (requestHandlers.containsKey(reg.getAction())) {
throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
}
}
}
public void removeHandler(String action) {
synchronized (requestHandlerMutex) {
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).remove(action).immutableMap();
requestHandlers = MapBuilder.newMapBuilder(requestHandlers).put(reg.getAction(), reg).immutableMap();
}
}

View File

@ -131,6 +131,8 @@ public class TransportReplicationActionTests extends ESTestCase {
private TransportService transportService;
private CapturingTransport transport;
private Action action;
private ShardStateAction shardStateAction;
/* *
* TransportReplicationAction needs an instance of IndexShard to count operations.
* indexShards is reset to null before each test and will be initialized upon request in the tests.
@ -150,7 +152,8 @@ public class TransportReplicationActionTests extends ESTestCase {
transportService = new TransportService(clusterService.getSettings(), transport, threadPool);
transportService.start();
transportService.acceptIncomingRequests();
action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool);
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, shardStateAction, threadPool);
}
@After
@ -707,7 +710,8 @@ public class TransportReplicationActionTests extends ESTestCase {
final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
boolean throwException = randomBoolean();
final ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) {
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request) {
assertIndexShardCounter(1);
@ -826,7 +830,8 @@ public class TransportReplicationActionTests extends ESTestCase {
setState(clusterService, state);
AtomicBoolean throwException = new AtomicBoolean(true);
final ReplicationTask task = maybeTask();
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) {
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request) {
assertPhase(task, "replica");
@ -940,9 +945,10 @@ public class TransportReplicationActionTests extends ESTestCase {
Action(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService,
ShardStateAction shardStateAction,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool,
new ShardStateAction(settings, clusterService, transportService, null, null, threadPool),
shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY),
Request::new, Request::new, ThreadPool.Names.SAME);
}

View File

@ -19,7 +19,9 @@
package org.elasticsearch.discovery.zen;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -35,18 +37,18 @@ import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.MockNode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -55,14 +57,11 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.createMockNode;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class ZenDiscoveryUnitTests extends ESTestCase {
public void testShouldIgnoreNewClusterState() {
@ -154,59 +153,76 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
Settings settings = Settings.builder()
.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build();
Map<String, MockNode> nodes = new HashMap<>();
ZenDiscovery zenDiscovery = null;
ClusterService clusterService = null;
ArrayList<Closeable> toClose = new ArrayList<>();
try {
Set<DiscoveryNode> expectedFDNodes = null;
// create master node and its mocked up services
MockNode master = createMockNode("master", settings, null, threadPool, logger, nodes).setAsMaster();
ClusterState state = master.clusterState; // initial cluster state
final MockTransportService masterTransport = MockTransportService.local(settings, Version.CURRENT, threadPool);
masterTransport.start();
DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT);
toClose.add(masterTransport);
masterTransport.setLocalNode(masterNode);
ClusterState state = ClusterStateCreationUtils.state(masterNode, masterNode, masterNode);
// build the zen discovery and cluster service
clusterService = createClusterService(threadPool, master.discoveryNode);
setState(clusterService, state);
zenDiscovery = buildZenDiscovery(settings, master, clusterService, threadPool);
ClusterService masterClusterService = createClusterService(threadPool, masterNode);
toClose.add(masterClusterService);
// TODO: clustername shouldn't be stored twice in cluster service, but for now, work around it
state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build();
setState(masterClusterService, state);
ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool);
toClose.add(masterZen);
masterTransport.acceptIncomingRequests();
final MockTransportService otherTransport = MockTransportService.local(settings, Version.CURRENT, threadPool);
otherTransport.start();
toClose.add(otherTransport);
DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT);
otherTransport.setLocalNode(otherNode);
final ClusterState otherState = ClusterState.builder(masterClusterService.getClusterName())
.nodes(DiscoveryNodes.builder().add(otherNode).localNodeId(otherNode.getId())).build();
ClusterService otherClusterService = createClusterService(threadPool, masterNode);
toClose.add(otherClusterService);
setState(otherClusterService, otherState);
ZenDiscovery otherZen = buildZenDiscovery(settings, otherTransport, otherClusterService, threadPool);
toClose.add(otherZen);
otherTransport.acceptIncomingRequests();
masterTransport.connectToNode(otherNode);
otherTransport.connectToNode(masterNode);
// a new cluster state with a new discovery node (we will test if the cluster state
// was updated by the presence of this node in NodesFaultDetection)
MockNode newNode = createMockNode("new_node", settings, null, threadPool, logger, nodes);
ClusterState newState = ClusterState.builder(state).incrementVersion().nodes(
DiscoveryNodes.builder(state.nodes()).add(newNode.discoveryNode).masterNodeId(master.discoveryNode.getId())
ClusterState newState = ClusterState.builder(masterClusterService.state()).incrementVersion().nodes(
DiscoveryNodes.builder(state.nodes()).add(otherNode).masterNodeId(masterNode.getId())
).build();
try {
// publishing a new cluster state
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state);
AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1);
expectedFDNodes = zenDiscovery.getFaultDetectionNodes();
zenDiscovery.publish(clusterChangedEvent, listener);
expectedFDNodes = masterZen.getFaultDetectionNodes();
masterZen.publish(clusterChangedEvent, listener);
listener.await(1, TimeUnit.HOURS);
// publish was a success, update expected FD nodes based on new cluster state
expectedFDNodes = fdNodesForState(newState, master.discoveryNode);
expectedFDNodes = fdNodesForState(newState, masterNode);
} catch (Discovery.FailedToCommitClusterStateException e) {
// not successful, so expectedFDNodes above should remain what it was originally assigned
assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail
}
assertEquals(expectedFDNodes, zenDiscovery.getFaultDetectionNodes());
assertEquals(expectedFDNodes, masterZen.getFaultDetectionNodes());
} finally {
// clean close of transport service and publish action for each node
zenDiscovery.close();
clusterService.close();
for (MockNode curNode : nodes.values()) {
curNode.action.close();
curNode.service.close();
}
IOUtils.close(toClose);
terminate(threadPool);
}
}
private ZenDiscovery buildZenDiscovery(Settings settings, MockNode master, ClusterService clusterService, ThreadPool threadPool) {
private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet());
ElectMasterService electMasterService = new ElectMasterService(settings);
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, master.service, clusterService,
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService,
clusterSettings, zenPingService, electMasterService);
zenDiscovery.start();
return zenDiscovery;

View File

@ -223,7 +223,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void tearDown() throws Exception {
super.tearDown();
for (MockNode curNode : nodes.values()) {
curNode.action.close();
curNode.service.close();
}
terminate(threadPool);

View File

@ -642,6 +642,4 @@ public class MockTransportService extends TransportService {
}
}
}
}

View File

@ -224,8 +224,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
serviceA.removeHandler("sayHello");
}
public void testThreadContext() throws ExecutionException, InterruptedException {
@ -281,8 +279,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
assertEquals("ping_user", threadPool.getThreadContext().getHeader("test.ping.user"));
assertSame(context, threadPool.getThreadContext().getTransient("my_private_context"));
assertNull("this header is only visible in the handler context", threadPool.getThreadContext().getHeader("some.temp.header"));
serviceA.removeHandler("sayHello");
}
public void testLocalNodeConnection() throws InterruptedException {
@ -375,8 +371,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
serviceA.removeHandler("sayHello");
}
public void testHelloWorldCompressed() {
@ -426,8 +420,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
serviceA.removeHandler("sayHello");
}
public void testErrorMessage() {
@ -469,8 +461,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (Exception e) {
assertThat(e.getCause().getMessage(), equalTo("runtime_exception: bad message !!!"));
}
serviceA.removeHandler("sayHelloException");
}
public void testDisconnectListener() throws Exception {
@ -635,7 +625,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (TransportException ex) {
}
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
}
public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception {
@ -678,8 +667,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
}
serviceA.removeHandler("sayHelloTimeoutNoResponse");
}
public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
@ -785,7 +772,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
for (Runnable runnable : assertions) {
runnable.run();
}
serviceA.removeHandler("sayHelloTimeoutDelayedResponse");
waitForever.countDown();
doneWaitingForever.await();
assertTrue(inFlight.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
@ -1325,8 +1311,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (ConnectTransportException e) {
// all is well
}
serviceA.removeHandler("sayHello");
}
public void testMockUnresponsiveRule() {
@ -1385,8 +1369,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
} catch (ConnectTransportException e) {
// all is well
}
serviceA.removeHandler("sayHello");
}
@ -1721,4 +1703,16 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceC.close();
}
public void testRegisterHandlerTwice() {
serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {throw new AssertionError("boom");});
expectThrows(IllegalArgumentException.class, () ->
serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {throw new AssertionError("boom");})
);
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {throw new AssertionError("boom");});
}
}