ZenDiscoveryUnitTests should close objects in reverse order of creation

One needs to close the higher level objects (like UnicastZenPing) before closing the transport service. The latter can throw assertions w.r.t open connections
This commit is contained in:
Boaz Leskes 2017-01-10 14:11:27 +01:00
parent f387848f83
commit 9aba49c571
1 changed files with 25 additions and 25 deletions

View File

@ -19,19 +19,6 @@
package org.elasticsearch.discovery.zen; package org.elasticsearch.discovery.zen;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
@ -52,7 +39,6 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener; import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
@ -67,6 +53,20 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
@ -173,38 +173,38 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build(); .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build();
ArrayList<Closeable> toClose = new ArrayList<>(); ArrayDeque<Closeable> toClose = new ArrayDeque<>();
try { try {
Set<DiscoveryNode> expectedFDNodes = null; Set<DiscoveryNode> expectedFDNodes = null;
final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null); final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
masterTransport.start(); masterTransport.start();
DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT); DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT);
toClose.add(masterTransport); toClose.addFirst(masterTransport);
masterTransport.setLocalNode(masterNode); masterTransport.setLocalNode(masterNode);
ClusterState state = ClusterStateCreationUtils.state(masterNode, masterNode, masterNode); ClusterState state = ClusterStateCreationUtils.state(masterNode, masterNode, masterNode);
// build the zen discovery and cluster service // build the zen discovery and cluster service
ClusterService masterClusterService = createClusterService(threadPool, masterNode); ClusterService masterClusterService = createClusterService(threadPool, masterNode);
toClose.add(masterClusterService); toClose.addFirst(masterClusterService);
// TODO: clustername shouldn't be stored twice in cluster service, but for now, work around it // TODO: clustername shouldn't be stored twice in cluster service, but for now, work around it
state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build(); state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build();
setState(masterClusterService, state); setState(masterClusterService, state);
ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool); ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool);
toClose.add(masterZen); toClose.addFirst(masterZen);
masterTransport.acceptIncomingRequests(); masterTransport.acceptIncomingRequests();
final MockTransportService otherTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null); final MockTransportService otherTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
otherTransport.start(); otherTransport.start();
toClose.add(otherTransport); toClose.addFirst(otherTransport);
DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT); DiscoveryNode otherNode = new DiscoveryNode("other", otherTransport.boundAddress().publishAddress(), Version.CURRENT);
otherTransport.setLocalNode(otherNode); otherTransport.setLocalNode(otherNode);
final ClusterState otherState = ClusterState.builder(masterClusterService.getClusterName()) final ClusterState otherState = ClusterState.builder(masterClusterService.getClusterName())
.nodes(DiscoveryNodes.builder().add(otherNode).localNodeId(otherNode.getId())).build(); .nodes(DiscoveryNodes.builder().add(otherNode).localNodeId(otherNode.getId())).build();
ClusterService otherClusterService = createClusterService(threadPool, masterNode); ClusterService otherClusterService = createClusterService(threadPool, masterNode);
toClose.add(otherClusterService); toClose.addFirst(otherClusterService);
setState(otherClusterService, otherState); setState(otherClusterService, otherState);
ZenDiscovery otherZen = buildZenDiscovery(settings, otherTransport, otherClusterService, threadPool); ZenDiscovery otherZen = buildZenDiscovery(settings, otherTransport, otherClusterService, threadPool);
toClose.add(otherZen); toClose.addFirst(otherZen);
otherTransport.acceptIncomingRequests(); otherTransport.acceptIncomingRequests();
masterTransport.connectToNode(otherNode); masterTransport.connectToNode(otherNode);
@ -244,21 +244,21 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build(); .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build();
ArrayList<Closeable> toClose = new ArrayList<>(); ArrayDeque<Closeable> toClose = new ArrayDeque<>();
try { try {
final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null); final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
masterTransport.start(); masterTransport.start();
DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT); DiscoveryNode masterNode = new DiscoveryNode("master", masterTransport.boundAddress().publishAddress(), Version.CURRENT);
toClose.add(masterTransport); toClose.addFirst(masterTransport);
masterTransport.setLocalNode(masterNode); masterTransport.setLocalNode(masterNode);
ClusterState state = ClusterStateCreationUtils.state(masterNode, null, masterNode); ClusterState state = ClusterStateCreationUtils.state(masterNode, null, masterNode);
// build the zen discovery and cluster service // build the zen discovery and cluster service
ClusterService masterClusterService = createClusterService(threadPool, masterNode); ClusterService masterClusterService = createClusterService(threadPool, masterNode);
toClose.add(masterClusterService); toClose.addFirst(masterClusterService);
state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build(); state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build();
setState(masterClusterService, state); setState(masterClusterService, state);
ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool); ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool);
toClose.add(masterZen); toClose.addFirst(masterZen);
masterTransport.acceptIncomingRequests(); masterTransport.acceptIncomingRequests();
// inject a pending cluster state // inject a pending cluster state