Tests: Add TestZenDiscovery and replace uses of MockZenPing with it (#21488)

This changes adds a test discovery (which internally uses the existing
mock zenping by default). Having the mock the test framework selects be a discovery
greatly simplifies discovery setup (no more weird callback to a Node
method).
This commit is contained in:
Ryan Ernst 2016-11-14 21:46:10 -08:00 committed by GitHub
parent 2cdf3eeae0
commit c7bd4f3454
23 changed files with 247 additions and 180 deletions

View File

@ -53,11 +53,9 @@ public class DiscoveryModule {
new Setting<>("discovery.zen.hosts_provider", (String)null, Optional::ofNullable, Property.NodeScope);
private final Discovery discovery;
private final ZenPing zenPing;
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService, NetworkService networkService,
ClusterService clusterService, Function<UnicastHostsProvider, ZenPing> createZenPing,
List<DiscoveryPlugin> plugins) {
ClusterService clusterService, List<DiscoveryPlugin> plugins) {
final UnicastHostsProvider hostsProvider;
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
@ -79,14 +77,11 @@ public class DiscoveryModule {
hostsProvider = Collections::emptyList;
}
zenPing = createZenPing.apply(hostsProvider);
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
discoveryTypes.put("zen", () -> new ZenDiscovery(settings, threadPool, transportService, clusterService, hostsProvider));
discoveryTypes.put("none", () -> new NoneDiscovery(settings, clusterService, clusterService.getClusterSettings()));
for (DiscoveryPlugin plugin : plugins) {
plugin.getDiscoveryTypes(threadPool, transportService, clusterService, zenPing).entrySet().forEach(entry -> {
plugin.getDiscoveryTypes(threadPool, transportService, clusterService, hostsProvider).entrySet().forEach(entry -> {
if (discoveryTypes.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Cannot register discovery type [" + entry.getKey() + "] twice");
}
@ -103,9 +98,4 @@ public class DiscoveryModule {
public Discovery getDiscovery() {
return discovery;
}
// TODO: remove this, it should be completely local to discovery, but service disruption tests want to mess with it
public ZenPing getZenPing() {
return zenPing;
}
}

View File

@ -107,7 +107,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
private final ZenPing zenPing;
protected final ZenPing zenPing; // protected to allow tests access
private final MasterFaultDetection masterFD;
private final NodesFaultDetection nodesFD;
private final PublishClusterStateAction publishClusterState;
@ -139,13 +139,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
private volatile NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ClusterSettings clusterSettings, ZenPing zenPing) {
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
super(settings);
this.clusterService = clusterService;
this.clusterName = clusterService.getClusterName();
this.transportService = transportService;
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
this.zenPing = zenPing;
this.discoverySettings = new DiscoverySettings(settings, clusterService.getClusterSettings());
this.zenPing = newZenPing(settings, threadPool, transportService, hostsProvider);
this.electMaster = new ElectMasterService(settings);
this.pingTimeout = PING_TIMEOUT_SETTING.get(settings);
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
@ -160,11 +160,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.ignore_non_master [{}]",
this.pingTimeout, joinTimeout, masterElectionIgnoreNonMasters);
clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, this::handleMinimumMasterNodesChanged, (value) -> {
clusterService.getClusterSettings().addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
this::handleMinimumMasterNodesChanged, (value) -> {
final ClusterState clusterState = clusterService.state();
int masterNodes = clusterState.nodes().getMasterNodes().size();
if (value > masterNodes) {
throw new IllegalArgumentException("cannot set " + ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " to more than the current master nodes count [" + masterNodes + "]");
throw new IllegalArgumentException("cannot set "
+ ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " to more than the current" +
" master nodes count [" + masterNodes + "]");
}
});
@ -188,6 +191,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
}
// protected to allow overriding in tests
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider);
}
@Override
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;

View File

@ -399,8 +399,7 @@ public class Node implements Closeable {
}
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
networkService, clusterService, hostsProvider -> newZenPing(settings, threadPool, transportService, hostsProvider),
pluginsService.filterPlugins(DiscoveryPlugin.class));
networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class));
pluginsService.processModules(modules);
modules.add(b -> {
b.bind(IndicesQueriesRegistry.class).toInstance(searchModule.getQueryParserRegistry());
@ -434,7 +433,6 @@ public class Node implements Closeable {
indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings()));
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
b.bind(ZenPing.class).toInstance(discoveryModule.getZenPing());
{
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
@ -873,12 +871,6 @@ public class Node implements Closeable {
return customNameResolvers;
}
/** Create a new ZenPing instance for use in zen discovery. */
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
return new UnicastZenPing(settings, threadPool, transportService, hostsProvider);
}
/** Constructs an internal node used as a client into a cluster fronted by this tribe node. */
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
return new Node(new Environment(settings), classpathPlugins);

View File

@ -57,10 +57,10 @@ public interface DiscoveryPlugin {
* @param threadPool Use to schedule ping actions
* @param transportService Use to communicate with other nodes
* @param clusterService Use to find current nodes in the cluster
* @param zenPing Use to ping other nodes with zen unicast host list
* @param hostsProvider Use to find configured hosts which should be pinged for initial discovery
*/
default Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPing zenPing) {
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
return Collections.emptyMap();
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.FaultDetection;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
@ -45,11 +46,6 @@ import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndexingMasterFailoverIT extends ESIntegTestCase {
@Override
protected boolean addMockZenPings() {
return false;
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
final HashSet<Class<? extends Plugin>> classes = new HashSet<>(super.nodePlugins());
@ -57,6 +53,12 @@ public class IndexingMasterFailoverIT extends ESIntegTestCase {
return classes;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}
/**
* Indexing operations which entail mapping changes require a blocking request to the master node to update the mapping.
* If the master node is being disrupted or if it cannot commit cluster state changes, it needs to retry within timeout limits.

View File

@ -19,6 +19,9 @@
package org.elasticsearch.client.transport;
import java.io.IOException;
import java.util.Arrays;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -32,14 +35,11 @@ import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
@ -66,7 +66,7 @@ public class TransportClientIT extends ESIntegTestCase {
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put("cluster.name", "foobar")
.build(), Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) {
.build(), Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class)).start()) {
TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
client.addTransportAddress(transportAddress);
// since we force transport clients there has to be one node started that we connect to.

View File

@ -36,6 +36,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDelay;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
@ -74,8 +75,9 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
}
@Override
protected boolean addMockZenPings() {
return false;
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}
public void testSimpleMinimumMasterNodes() throws Exception {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
@ -62,8 +63,9 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
}
@Override
protected boolean addMockZenPings() {
return false;
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}
private void createStaleReplicaScenario() throws Exception {

View File

@ -23,6 +23,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
@ -39,15 +41,22 @@ import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.NoopDiscovery;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DiscoveryModuleTests extends ESTestCase {
private TransportService transportService;
private ClusterService clusterService;
private ThreadPool threadPool;
public interface DummyHostsProviderPlugin extends DiscoveryPlugin {
Map<String, Supplier<UnicastHostsProvider>> impl();
@ -62,52 +71,51 @@ public class DiscoveryModuleTests extends ESTestCase {
Map<String, Supplier<Discovery>> impl();
@Override
default Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPing zenPing) {
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
return impl();
}
}
@Before
public void setupDummyServices() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, null, null);
clusterService = new ClusterService(Settings.EMPTY, clusterSettings, null);
clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
threadPool = mock(ThreadPool.class);
}
@After
public void clearDummyServices() throws IOException {
IOUtils.close(transportService, clusterService);
transportService = null;
clusterService = null;
IOUtils.close(transportService);
}
private DiscoveryModule newModule(Settings settings, Function<UnicastHostsProvider, ZenPing> createZenPing,
List<DiscoveryPlugin> plugins) {
return new DiscoveryModule(settings, null, transportService, null, clusterService, createZenPing, plugins);
private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
return new DiscoveryModule(settings, threadPool, transportService, null, clusterService, plugins);
}
public void testDefaults() {
DiscoveryModule module = newModule(Settings.EMPTY, hostsProvider -> null, Collections.emptyList());
DiscoveryModule module = newModule(Settings.EMPTY, Collections.emptyList());
assertTrue(module.getDiscovery() instanceof ZenDiscovery);
}
public void testLazyConstructionDiscovery() {
DummyDiscoveryPlugin plugin = () -> Collections.singletonMap("custom",
() -> { throw new AssertionError("created discovery type which was not selected"); });
newModule(Settings.EMPTY, hostsProvider -> null, Collections.singletonList(plugin));
newModule(Settings.EMPTY, Collections.singletonList(plugin));
}
public void testRegisterDiscovery() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "custom").build();
DummyDiscoveryPlugin plugin = () -> Collections.singletonMap("custom", NoopDiscovery::new);
DiscoveryModule module = newModule(settings, hostsProvider -> null, Collections.singletonList(plugin));
DiscoveryModule module = newModule(settings, Collections.singletonList(plugin));
assertTrue(module.getDiscovery() instanceof NoopDiscovery);
}
public void testUnknownDiscovery() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "dne").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
newModule(settings, hostsProvider -> null, Collections.emptyList()));
newModule(settings, Collections.emptyList()));
assertEquals("Unknown discovery type [dne]", e.getMessage());
}
@ -115,24 +123,26 @@ public class DiscoveryModuleTests extends ESTestCase {
DummyDiscoveryPlugin plugin1 = () -> Collections.singletonMap("dup", () -> null);
DummyDiscoveryPlugin plugin2 = () -> Collections.singletonMap("dup", () -> null);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
newModule(Settings.EMPTY, hostsProvider -> null, Arrays.asList(plugin1, plugin2)));
newModule(Settings.EMPTY, Arrays.asList(plugin1, plugin2)));
assertEquals("Cannot register discovery type [dup] twice", e.getMessage());
}
public void testHostsProvider() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "custom").build();
final UnicastHostsProvider provider = Collections::emptyList;
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom", () -> provider);
newModule(settings, hostsProvider -> {
assertEquals(provider, hostsProvider);
return null;
}, Collections.singletonList(plugin));
AtomicBoolean created = new AtomicBoolean(false);
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom", () -> {
created.set(true);
return Collections::emptyList;
});
newModule(settings, Collections.singletonList(plugin));
assertTrue(created.get());
}
public void testUnknownHostsProvider() {
Settings settings = Settings.builder().put(DiscoveryModule.DISCOVERY_HOSTS_PROVIDER_SETTING.getKey(), "dne").build();
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
newModule(settings, hostsProvider -> null, Collections.emptyList()));
newModule(settings, Collections.emptyList()));
assertEquals("Unknown zen hosts provider [dne]", e.getMessage());
}
@ -140,13 +150,13 @@ public class DiscoveryModuleTests extends ESTestCase {
DummyHostsProviderPlugin plugin1 = () -> Collections.singletonMap("dup", () -> null);
DummyHostsProviderPlugin plugin2 = () -> Collections.singletonMap("dup", () -> null);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
newModule(Settings.EMPTY, hostsProvider -> null, Arrays.asList(plugin1, plugin2)));
newModule(Settings.EMPTY, Arrays.asList(plugin1, plugin2)));
assertEquals("Cannot register zen hosts provider [dup] twice", e.getMessage());
}
public void testLazyConstructionHostsProvider() {
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom",
() -> { throw new AssertionError("created hosts provider which was not selected"); });
newModule(Settings.EMPTY, hostsProvider -> null, Collections.singletonList(plugin));
newModule(Settings.EMPTY, Collections.singletonList(plugin));
}
}

View File

@ -67,6 +67,7 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.discovery.ClusterDiscoveryConfiguration;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
import org.elasticsearch.test.disruption.LongGCDisruption;
@ -129,14 +130,10 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
private ClusterDiscoveryConfiguration discoveryConfig;
@Override
protected boolean addMockZenPings() {
return false;
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return discoveryConfig.nodeSettings(nodeOrdinal);
return Settings.builder().put(discoveryConfig.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}
@Before
@ -196,7 +193,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
ensureStableCluster(numberOfNodes);
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
ZenPing zenPing = internalCluster().getInstance(ZenPing.class);
ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
@ -856,7 +853,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
ZenPing zenPing = internalCluster().getInstance(ZenPing.class);
ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}
@ -893,7 +890,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
ZenPing zenPing = internalCluster().getInstance(ZenPing.class);
ZenPing zenPing = ((TestZenDiscovery)internalCluster().getInstance(Discovery.class)).getZenPing();
if (zenPing instanceof UnicastZenPing) {
((UnicastZenPing) zenPing).clearTemporalResponses();
}

View File

@ -19,6 +19,16 @@
package org.elasticsearch.discovery.zen;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
@ -34,22 +44,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
@ -269,8 +268,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
}
private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, clusterSettings, new MockZenPing(settings));
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, clusterService, Collections::emptyList);
zenDiscovery.start();
return zenDiscovery;
}

View File

@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -76,8 +77,9 @@ import static org.hamcrest.Matchers.instanceOf;
public class RareClusterStateIT extends ESIntegTestCase {
@Override
protected boolean addMockZenPings() {
return false;
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}
@Override

View File

@ -74,10 +74,10 @@ public class AzureDiscoveryPlugin extends Plugin implements DiscoveryPlugin {
@Override
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPing zenPing) {
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
return Collections.singletonMap(AZURE, () ->
new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
new ZenDiscovery(settings, threadPool, transportService, clusterService, hostsProvider));
}
@Override

View File

@ -101,10 +101,10 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
@Override
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPing zenPing) {
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
return Collections.singletonMap(EC2, () ->
new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
new ZenDiscovery(settings, threadPool, transportService, clusterService, hostsProvider));
}
@Override

View File

@ -99,10 +99,10 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
@Override
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ZenPing zenPing) {
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
// this is for backcompat with pre 5.1, where users would set discovery.type to use ec2 hosts provider
return Collections.singletonMap(GCE, () ->
new ZenDiscovery(settings, threadPool, transportService, clusterService, clusterService.getClusterSettings(), zenPing));
new ZenDiscovery(settings, threadPool, transportService, clusterService, hostsProvider));
}
@Override

View File

@ -19,6 +19,11 @@
package org.elasticsearch.tribe;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
@ -34,16 +39,11 @@ import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import static org.hamcrest.CoreMatchers.either;
import static org.hamcrest.CoreMatchers.equalTo;
@ -68,7 +68,7 @@ public class TribeUnitTests extends ESTestCase {
.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), 2)
.build();
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class);
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class);
tribe1 = new MockNode(
Settings.builder()
.put(baseSettings)
@ -110,7 +110,7 @@ public class TribeUnitTests extends ESTestCase {
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put(extraSettings).build();
try (Node node = new MockNode(settings, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class)).start()) {
try (Node node = new MockNode(settings, Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class)).start()) {
try (Client client = node.client()) {
assertBusy(() -> {
ClusterState state = client.admin().cluster().prepareState().clear().setNodes(true).get().getState();

View File

@ -102,16 +102,6 @@ public class MockNode extends Node {
}
}
@Override
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
if (getPluginsService().filterPlugins(MockZenPing.TestPlugin.class).isEmpty()) {
return super.newZenPing(settings, threadPool, transportService, hostsProvider);
} else {
return new MockZenPing(settings);
}
}
@Override
protected Node newTribeClientNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
return new MockNode(settings, classpathPlugins);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.junit.listeners.LoggingListener;
@ -204,11 +205,6 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase {
return finalSettings.build();
}
@Override
protected boolean addMockZenPings() {
return false;
}
protected int minExternalNodes() { return 1; }
protected int maxExternalNodes() {
@ -246,6 +242,7 @@ public abstract class ESBackcompatTestCase extends ESIntegTestCase {
protected Settings commonNodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder().put(requiredSettings());
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? "netty3" : "netty4"); // run same transport / disco as external
builder.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false);
return builder.build();
}

View File

@ -119,7 +119,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.client.RandomizingClient;
import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
@ -1805,10 +1805,6 @@ public abstract class ESIntegTestCase extends ESTestCase {
return true;
}
protected boolean addMockZenPings() {
return true;
}
/**
* Returns a function that allows to wrap / filter all clients that are exposed by the test cluster. This is useful
* for debugging or request / response pre and post processing. It also allows to intercept all calls done by the test
@ -1846,9 +1842,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
mocks.add(MockTcpTransportPlugin.class);
}
if (addMockZenPings()) {
mocks.add(MockZenPing.TestPlugin.class);
}
mocks.add(TestZenDiscovery.TestPlugin.class);
mocks.add(TestSeedPlugin.class);
return Collections.unmodifiableList(mocks);
}

View File

@ -18,6 +18,13 @@
*/
package org.elasticsearch.test;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
@ -45,7 +52,7 @@ import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.junit.After;
@ -53,13 +60,6 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -191,9 +191,9 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
plugins = new ArrayList<>(plugins);
plugins.add(MockTcpTransportPlugin.class);
}
if (plugins.contains(MockZenPing.TestPlugin.class) == false) {
if (plugins.contains(TestZenDiscovery.TestPlugin.class) == false) {
plugins = new ArrayList<>(plugins);
plugins.add(MockZenPing.TestPlugin.class);
plugins.add(TestZenDiscovery.TestPlugin.class);
}
Node build = new MockNode(settings, plugins);
try {

View File

@ -18,39 +18,30 @@
*/
package org.elasticsearch.test.discovery;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.PingContextProvider;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.PingContextProvider;
import org.elasticsearch.discovery.zen.ZenPing;
/**
* A {@link ZenPing} implementation which returns results based on an static in-memory map. This allows pinging
* to be immediate and can be used to speed up tests.
*/
public final class MockZenPing extends AbstractComponent implements ZenPing {
/** A marker plugin used by {@link org.elasticsearch.node.MockNode} to indicate this mock zen ping should be used. */
public static class TestPlugin extends Plugin {}
static final Map<ClusterName, Set<MockZenPing>> activeNodesPerCluster = ConcurrentCollections.newConcurrentMap();
private volatile PingContextProvider contextProvider;
@Inject
public MockZenPing(Settings settings) {
super(settings);
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.discovery;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
/**
* A alternative zen discovery which allows using mocks for things like pings, as well as
* giving access to internals.
*/
public class TestZenDiscovery extends ZenDiscovery {
public static final Setting<Boolean> USE_MOCK_PINGS =
Setting.boolSetting("discovery.zen.use_mock_pings", true, Setting.Property.NodeScope);
/** A plugin which installs mock discovery and configures it to be used. */
public static class TestPlugin extends Plugin implements DiscoveryPlugin {
private Settings settings;
public TestPlugin(Settings settings) {
this.settings = settings;
}
@Override
public Map<String, Supplier<Discovery>> getDiscoveryTypes(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
return Collections.singletonMap("test-zen",
() -> new TestZenDiscovery(settings, threadPool, transportService, clusterService, hostsProvider));
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(USE_MOCK_PINGS);
}
@Override
public Settings additionalSettings() {
return Settings.builder().put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), "test-zen").build();
}
}
private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, UnicastHostsProvider hostsProvider) {
super(settings, threadPool, transportService, clusterService, hostsProvider);
}
@Override
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
UnicastHostsProvider hostsProvider) {
if (USE_MOCK_PINGS.get(settings)) {
return new MockZenPing(settings);
} else {
return super.newZenPing(settings, threadPool, transportService, hostsProvider);
}
}
public ZenPing getZenPing() {
return zenPing;
}
}

View File

@ -19,24 +19,6 @@
*/
package org.elasticsearch.test.test;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TransportSettings;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@ -52,6 +34,24 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeConfigurationSource;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.MockTcpTransportPlugin;
import org.elasticsearch.transport.TransportSettings;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.DATA;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.INGEST;
import static org.elasticsearch.cluster.node.DiscoveryNode.Role.MASTER;
@ -155,7 +155,7 @@ public class InternalTestClusterTests extends ESTestCase {
String nodePrefix = "foobar";
Path baseDir = createTempDir();
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class);
final List<Class<? extends Plugin>> mockPlugins = Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class);
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, mockPlugins, Function.identity());
@ -218,7 +218,7 @@ public class InternalTestClusterTests extends ESTestCase {
Path baseDir = createTempDir();
InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes,
minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes,
enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class),
enableHttpPipelining, nodePrefix, Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class),
Function.identity());
try {
cluster.beforeTest(random(), 0.0);
@ -296,7 +296,7 @@ public class InternalTestClusterTests extends ESTestCase {
return Settings.builder()
.put(NetworkModule.TRANSPORT_TYPE_KEY, MockTcpTransportPlugin.MOCK_TCP_TRANSPORT_NAME).build();
}
}, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, MockZenPing.TestPlugin.class), Function.identity());
}, 0, randomBoolean(), "", Arrays.asList(MockTcpTransportPlugin.class, TestZenDiscovery.TestPlugin.class), Function.identity());
cluster.beforeTest(random(), 0.0);
try {
Map<DiscoveryNode.Role, Set<String>> pathsPerRole = new HashMap<>();