diff --git a/pom.xml b/pom.xml index 6e56664bf4a..b0b75b58932 100644 --- a/pom.xml +++ b/pom.xml @@ -16,9 +16,9 @@ - 1.4.0 - 4.10.2 - 4.10.2 + 1.5.0 + 4.10.4 + 4.10.4 auto true diff --git a/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java b/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java index 53d0c71b65a..c46747607c5 100644 --- a/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java +++ b/src/main/java/org/elasticsearch/watcher/input/search/SearchInput.java @@ -125,7 +125,7 @@ public class SearchInput extends Input { ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams); request.source((BytesReference) script.unwrap(script.run()), false); } else if (requestPrototype.templateName() != null) { - MapBuilder templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams()) + MapBuilder templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams()) .put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime)) .put(Variables.FIRE_TIME, formatDate(fireTime)) .put(Variables.EXECUTION_TIME, formatDate(executionTime)); diff --git a/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java b/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java index 67d7646d3c8..8471ff1e816 100644 --- a/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java +++ b/src/main/java/org/elasticsearch/watcher/support/WatcherUtils.java @@ -229,13 +229,13 @@ public final class WatcherUtils { return builder.endObject(); } - public static Map flattenModel(Map map) { - Map result = new HashMap<>(); + public static Map flattenModel(Map map) { + Map result = new HashMap<>(); flattenModel("", map, result); return result; } - private static void flattenModel(String key, Object value, Map result) { + private static void flattenModel(String key, Object value, Map result) { if (value instanceof Map) { for (Map.Entry entry : ((Map) value).entrySet()) { if ("".equals(key)) { diff --git a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ScriptServiceProxy.java b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ScriptServiceProxy.java index da4fab826e7..689ad6cca41 100644 --- a/src/main/java/org/elasticsearch/watcher/support/init/proxy/ScriptServiceProxy.java +++ b/src/main/java/org/elasticsearch/watcher/support/init/proxy/ScriptServiceProxy.java @@ -5,13 +5,12 @@ */ package org.elasticsearch.watcher.support.init.proxy; -import org.elasticsearch.watcher.support.init.InitializingService; import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.script.CompiledScript; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.SearchScript; import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.watcher.support.init.InitializingService; import java.util.Map; @@ -41,10 +40,6 @@ public class ScriptServiceProxy implements InitializingService.Initializable { return service.executable(lang, script, scriptType, vars); } - public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map vars) { - return service.search(compiledScript, lookup, vars); - } - public SearchScript search(SearchLookup lookup, String lang, String script, ScriptService.ScriptType scriptType, Map vars) { return service.search(lookup, lang, script, scriptType, vars); } diff --git a/src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java b/src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java index c17b1968a43..1e3ba650837 100644 --- a/src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java +++ b/src/main/java/org/elasticsearch/watcher/transform/SearchTransform.java @@ -98,7 +98,7 @@ public class SearchTransform extends Transform { ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createCtxModel(ctx, payload)); request.source((BytesReference) script.unwrap(script.run()), false); } else if (requestPrototype.templateName() != null) { - MapBuilder templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams()) + MapBuilder templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams()) .putAll(flattenModel(createCtxModel(ctx, payload))); request.templateParams(templateParams.map()); request.templateName(requestPrototype.templateName()); diff --git a/src/test/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/test/java/org/elasticsearch/discovery/local/LocalDiscovery.java deleted file mode 100644 index 9701c8b8175..00000000000 --- a/src/test/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.discovery.local; - -import com.google.common.base.Objects; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.*; -import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodeService; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.internal.Nullable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.*; -import org.elasticsearch.node.service.NodeService; -import org.elasticsearch.transport.TransportService; - -import java.util.HashSet; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.cluster.ClusterState.Builder; - -/** - * Forked from ES core just for testing. This class is first on the classpath and will be picked - * before the one in es core jar file. - * - * The version in core has a Guice circular dependency issue with 'DiscoveryService'. - * It doesn't need it, so in this version that has been removed. - * - * The circular dependency issue only manifests when plugin.types setting is used in tests. - */ -public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { - - private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; - - private final TransportService transportService; - private final ClusterService clusterService; - private final DiscoveryNodeService discoveryNodeService; - private AllocationService allocationService; - private final ClusterName clusterName; - private final Version version; - - private final DiscoverySettings discoverySettings; - - private DiscoveryNode localNode; - - private volatile boolean master = false; - - private final AtomicBoolean initialStateSent = new AtomicBoolean(); - - private final CopyOnWriteArrayList initialStateListeners = new CopyOnWriteArrayList<>(); - - private static final ConcurrentMap clusterGroups = ConcurrentCollections.newConcurrentMap(); - - @Inject - public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService, - DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) { - super(settings); - this.clusterName = clusterName; - this.clusterService = clusterService; - this.transportService = transportService; - this.discoveryNodeService = discoveryNodeService; - this.version = version; - this.discoverySettings = discoverySettings; - } - - @Override - public void setNodeService(@Nullable NodeService nodeService) { - // nothing to do here - } - - @Override - public void setAllocationService(AllocationService allocationService) { - this.allocationService = allocationService; - } - - @Override - protected void doStart() throws ElasticsearchException { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - clusterGroup = new ClusterGroup(); - clusterGroups.put(clusterName, clusterGroup); - } - logger.debug("Connected to cluster [{}]", clusterName); - this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(), - discoveryNodeService.buildAttributes(), version); - - clusterGroup.members().add(this); - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().masterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null && firstMaster.equals(this)) { - // we are the first master (and the master) - master = true; - final LocalDiscovery master = firstMaster; - clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.put(discovery.localNode); - } - nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); - // remove the NO_MASTER block in this case - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()); - return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - sendInitialStateEventIfNeeded(); - } - }); - } else if (firstMaster != null) { - // update as fast as we can the local node state with the new metadata (so we create indices for example) - final ClusterState masterState = firstMaster.clusterService.state(); - clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - // make sure we have the local node id set, we might need it as a result of the new metadata - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id()); - return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - }); - - // tell the master to send the fact that we are here - final LocalDiscovery master = firstMaster; - firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); - for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) { - nodesBuilder.put(discovery.localNode); - } - nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id()); - return ClusterState.builder(currentState).nodes(nodesBuilder).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - sendInitialStateEventIfNeeded(); - } - }); - } - } // else, no master node, the next node that will start will fill things in... - } - - @Override - protected void doStop() throws ElasticsearchException { - synchronized (clusterGroups) { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - logger.warn("Illegal state, should not have an empty cluster group when stopping, I should be there at teh very least..."); - return; - } - clusterGroup.members().remove(this); - if (clusterGroup.members().isEmpty()) { - // no more members, remove and return - clusterGroups.remove(clusterName); - return; - } - - LocalDiscovery firstMaster = null; - for (LocalDiscovery localDiscovery : clusterGroup.members()) { - if (localDiscovery.localNode().masterNode()) { - firstMaster = localDiscovery; - break; - } - } - - if (firstMaster != null) { - // if the removed node is the master, make the next one as the master - if (master) { - firstMaster.master = true; - } - - final Set newMembers = new HashSet<>(); - for (LocalDiscovery discovery : clusterGroup.members()) { - newMembers.add(discovery.localNode.id()); - } - - final LocalDiscovery master = firstMaster; - master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id()); - DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes()); - if (delta.added()) { - logger.warn("No new nodes should be created when a new discovery view is accepted"); - } - // reroute here, so we eagerly remove dead nodes from the routing - ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); - RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build()); - return ClusterState.builder(updatedState).routingResult(routingResult).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - }); - } - } - } - - @Override - protected void doClose() throws ElasticsearchException { - } - - @Override - public DiscoveryNode localNode() { - return localNode; - } - - @Override - public void addListener(InitialStateDiscoveryListener listener) { - this.initialStateListeners.add(listener); - } - - @Override - public void removeListener(InitialStateDiscoveryListener listener) { - this.initialStateListeners.remove(listener); - } - - @Override - public String nodeDescription() { - return clusterName.value() + "/" + localNode.id(); - } - - public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) { - if (!master) { - throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master"); - } - LocalDiscovery[] members = members(); - if (members.length > 0) { - publish(members, clusterState, new AckClusterStatePublishResponseHandler(members.length - 1, ackListener)); - } - } - - private LocalDiscovery[] members() { - ClusterGroup clusterGroup = clusterGroups.get(clusterName); - if (clusterGroup == null) { - return NO_MEMBERS; - } - Queue members = clusterGroup.members(); - return members.toArray(new LocalDiscovery[members.size()]); - } - - private void publish(LocalDiscovery[] members, ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { - - try { - // we do the marshaling intentionally, to check it works well... - final byte[] clusterStateBytes = Builder.toBytes(clusterState); - - for (final LocalDiscovery discovery : members) { - if (discovery.master) { - continue; - } - final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode, clusterName); - nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED); - // ignore cluster state messages that do not include "me", not in the game yet... - if (nodeSpecificClusterState.nodes().localNode() != null) { - assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master"; - assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block"; - - discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) { - return currentState; - } - - if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) { - // its a fresh update from the master as we transition from a start of not having a master to having one - logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId()); - return nodeSpecificClusterState; - } - - ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState); - // if the routing table did not change, use the original one - if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) { - builder.routingTable(currentState.routingTable()); - } - if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) { - builder.metaData(currentState.metaData()); - } - - return builder.build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - publishResponseHandler.onFailure(discovery.localNode, t); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - sendInitialStateEventIfNeeded(); - publishResponseHandler.onResponse(discovery.localNode); - } - }); - } else { - publishResponseHandler.onResponse(discovery.localNode); - } - } - - TimeValue publishTimeout = discoverySettings.getPublishTimeout(); - if (publishTimeout.millis() > 0) { - try { - boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); - if (!awaited) { - logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout); - } - } catch (InterruptedException e) { - // ignore & restore interrupt - Thread.currentThread().interrupt(); - } - } - - - } catch (Exception e) { - // failure to marshal or un-marshal - throw new ElasticsearchIllegalStateException("Cluster state failed to serialize", e); - } - } - - private void sendInitialStateEventIfNeeded() { - if (initialStateSent.compareAndSet(false, true)) { - for (InitialStateDiscoveryListener listener : initialStateListeners) { - listener.initialStateProcessed(); - } - } - } - - private class ClusterGroup { - - private Queue members = ConcurrentCollections.newQueue(); - - Queue members() { - return members; - } - } -} diff --git a/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java b/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java index ed28aa7ef34..51f2ff9abf5 100644 --- a/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java +++ b/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionSearchTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.watcher.condition.script; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.watcher.watch.WatchExecutionContext; import org.elasticsearch.watcher.watch.Payload; import org.elasticsearch.watcher.support.Script; @@ -47,13 +48,16 @@ public class ScriptConditionSearchTests extends AbstractWatcherSingleNodeTests { private ScriptServiceProxy scriptService; @Before - public void init() { + public void init() throws Exception { tp = new ThreadPool(ThreadPool.Names.SAME); - Settings settings = ImmutableSettings.settingsBuilder().build(); + Settings settings = ImmutableSettings.settingsBuilder() + .put(ScriptService.DISABLE_DYNAMIC_SCRIPTING_SETTING, "none") + .build(); GroovyScriptEngineService groovyScriptEngineService = new GroovyScriptEngineService(settings); Set engineServiceSet = new HashSet<>(); engineServiceSet.add(groovyScriptEngineService); - scriptService = ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp))); + NodeSettingsService nodeSettingsService = new NodeSettingsService(settings); + scriptService = ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService)); } @After diff --git a/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionTests.java b/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionTests.java index 80c16b036c9..5c3f9f86e37 100644 --- a/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionTests.java +++ b/src/test/java/org/elasticsearch/watcher/condition/script/ScriptConditionTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.watcher.condition.script; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.watcher.WatcherSettingsException; import org.elasticsearch.watcher.watch.WatchExecutionContext; import org.elasticsearch.watcher.watch.Payload; @@ -147,12 +148,15 @@ public class ScriptConditionTests extends ElasticsearchTestCase { fail("expected a condition exception trying to parse an invalid condition XContent"); } - private static ScriptServiceProxy getScriptServiceProxy(ThreadPool tp) { - Settings settings = ImmutableSettings.settingsBuilder().build(); + private static ScriptServiceProxy getScriptServiceProxy(ThreadPool tp) throws IOException { + Settings settings = ImmutableSettings.settingsBuilder() + .put(ScriptService.DISABLE_DYNAMIC_SCRIPTING_SETTING, "none") + .build(); GroovyScriptEngineService groovyScriptEngineService = new GroovyScriptEngineService(settings); Set engineServiceSet = new HashSet<>(); engineServiceSet.add(groovyScriptEngineService); - return ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp))); + NodeSettingsService nodeSettingsService = new NodeSettingsService(settings); + return ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService)); } private static XContentBuilder createConditionContent(String script, String scriptLang, ScriptService.ScriptType scriptType) throws IOException { diff --git a/src/test/java/org/elasticsearch/watcher/support/WatcherUtilsTests.java b/src/test/java/org/elasticsearch/watcher/support/WatcherUtilsTests.java index d68ff25fa1c..449fa650ed5 100644 --- a/src/test/java/org/elasticsearch/watcher/support/WatcherUtilsTests.java +++ b/src/test/java/org/elasticsearch/watcher/support/WatcherUtilsTests.java @@ -45,7 +45,8 @@ public class WatcherUtilsTests extends ElasticsearchTestCase { .put("d", now) .build(); - Map result = flattenModel(map); + @SuppressWarnings("unchecked") + Map result = (Map) flattenModel(map); assertThat(result.size(), is(9)); assertThat(result, hasEntry("a.a1.0", "0")); assertThat(result, hasEntry("a.a1.1", "1")); @@ -99,7 +100,7 @@ public class WatcherUtilsTests extends ElasticsearchTestCase { expectedRequest.templateName(randomAsciiOfLengthBetween(1, 5)); expectedRequest.templateType(randomFrom(ScriptService.ScriptType.values())); if (randomBoolean()) { - Map params = new HashMap<>(); + Map params = new HashMap<>(); int maxParams = randomIntBetween(1, 10); for (int i = 0; i < maxParams; i++) { params.put(randomAsciiOfLengthBetween(1, 5), randomAsciiOfLengthBetween(1, 5)); diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java index 7a929ea84f1..43d6ff7f252 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherIntegrationTests.java @@ -10,26 +10,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.watcher.watch.Watch; -import org.elasticsearch.watcher.WatcherPlugin; -import org.elasticsearch.watcher.watch.WatchService; -import org.elasticsearch.watcher.actions.email.service.Authentication; -import org.elasticsearch.watcher.actions.email.service.Email; -import org.elasticsearch.watcher.actions.email.service.EmailService; -import org.elasticsearch.watcher.actions.email.service.Profile; -import org.elasticsearch.watcher.actions.webhook.HttpClient; -import org.elasticsearch.watcher.client.WatcherClient; -import org.elasticsearch.watcher.history.WatchRecord; -import org.elasticsearch.watcher.history.HistoryStore; -import org.elasticsearch.watcher.scheduler.Scheduler; -import org.elasticsearch.watcher.scheduler.SchedulerMock; -import org.elasticsearch.watcher.scheduler.schedule.Schedule; -import org.elasticsearch.watcher.scheduler.schedule.Schedules; -import org.elasticsearch.watcher.support.WatcherUtils; -import org.elasticsearch.watcher.support.clock.ClockMock; -import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; -import org.elasticsearch.watcher.support.template.Template; -import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; @@ -48,6 +28,26 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.TestCluster; +import org.elasticsearch.watcher.WatcherPlugin; +import org.elasticsearch.watcher.actions.email.service.Authentication; +import org.elasticsearch.watcher.actions.email.service.Email; +import org.elasticsearch.watcher.actions.email.service.EmailService; +import org.elasticsearch.watcher.actions.email.service.Profile; +import org.elasticsearch.watcher.actions.webhook.HttpClient; +import org.elasticsearch.watcher.client.WatcherClient; +import org.elasticsearch.watcher.history.HistoryStore; +import org.elasticsearch.watcher.history.WatchRecord; +import org.elasticsearch.watcher.scheduler.Scheduler; +import org.elasticsearch.watcher.scheduler.SchedulerMock; +import org.elasticsearch.watcher.scheduler.schedule.Schedule; +import org.elasticsearch.watcher.scheduler.schedule.Schedules; +import org.elasticsearch.watcher.support.WatcherUtils; +import org.elasticsearch.watcher.support.clock.ClockMock; +import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; +import org.elasticsearch.watcher.support.template.Template; +import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse; +import org.elasticsearch.watcher.watch.Watch; +import org.elasticsearch.watcher.watch.WatchService; import org.junit.After; import org.junit.Before; @@ -352,6 +352,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return (InternalTestCluster) ((WatcherWrappingCluster) cluster()).testCluster; } + // We need this custom impl, because we have custom wipe logic. We don't want the watcher index templates to get deleted between tests private final class WatcherWrappingCluster extends TestCluster { private final TestCluster testCluster; @@ -411,11 +412,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg return testCluster.numDataAndMasterNodes(); } - @Override - public int numBenchNodes() { - return testCluster.numBenchNodes(); - } - @Override public InetSocketAddress[] httpAddresses() { return testCluster.httpAddresses(); @@ -423,45 +419,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg @Override public void close() throws IOException { - InternalTestCluster _testCluster = (InternalTestCluster) testCluster; - Set nodes = new HashSet<>(Arrays.asList(_testCluster.getNodeNames())); - String masterNode = _testCluster.getMasterName(); - nodes.remove(masterNode); - - // First manually stop watcher on non elected master node, this will prevent that watcher becomes active - // on these nodes - for (String node : nodes) { - WatchService watchService = _testCluster.getInstance(WatchService.class, node); - assertThat(watchService.state(), equalTo(WatchService.State.STOPPED)); - watchService.stop(); // Prevents these nodes from starting watcher when new elected master node is picked. - } - - // Then stop watcher on elected master node and wait until watcher has stopped on it. - final WatchService watchService = _testCluster.getInstance(WatchService.class, masterNode); - try { - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(watchService.state(), not(equalTo(WatchService.State.STARTING))); - } - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - watchService.stop(); - try { - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(watchService.state(), equalTo(WatchService.State.STOPPED)); - } - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - - // Now when can close nodes, without watcher trying to become active while nodes briefly become master - // during cluster shutdown. testCluster.close(); } diff --git a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherSingleNodeTests.java b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherSingleNodeTests.java index 1ac5ecd557c..1b3e63a44ba 100644 --- a/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherSingleNodeTests.java +++ b/src/test/java/org/elasticsearch/watcher/test/AbstractWatcherSingleNodeTests.java @@ -5,17 +5,12 @@ */ package org.elasticsearch.watcher.test; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.elasticsearch.watcher.WatcherLifeCycleService; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ElasticsearchSingleNodeTest; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -23,7 +18,6 @@ import java.util.Collections; import java.util.Map; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -import static org.hamcrest.Matchers.equalTo; /** * @@ -53,18 +47,6 @@ public abstract class AbstractWatcherSingleNodeTests extends ElasticsearchSingle return client().prepareIndex(index, type, id).setSource(doc).get(); } - protected ClusterHealthStatus ensureGreen(String... indices) { - ClusterHealthResponse actionGet = client().admin().cluster() - .health(Requests.clusterHealthRequest(indices).timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); - if (actionGet.isTimedOut()) { - logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); - assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); - } - assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - logger.debug("indices {} are green", indices.length == 0 ? "[_all]" : indices); - return actionGet.getStatus(); - } - protected RefreshResponse refresh() { RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet(); assertNoFailures(actionGet);