Set minimum binary compatibility to ES 1.5

Closes elastic/elasticsearch#160

Original commit: elastic/x-pack-elasticsearch@10da3efb1a
This commit is contained in:
Martijn van Groningen 2015-03-26 07:59:15 +01:00
parent 9994326f11
commit 6277a32b91
11 changed files with 48 additions and 486 deletions

View File

@ -16,9 +16,9 @@
</parent>
<properties>
<elasticsearch.version>1.4.0</elasticsearch.version>
<lucene.maven.version>4.10.2</lucene.maven.version>
<lucene.version>4.10.2</lucene.version>
<elasticsearch.version>1.5.0</elasticsearch.version>
<lucene.maven.version>4.10.4</lucene.maven.version>
<lucene.version>4.10.4</lucene.version>
<tests.jvms>auto</tests.jvms>
<tests.shuffle>true</tests.shuffle>

View File

@ -125,7 +125,7 @@ public class SearchInput extends Input<SearchInput.Result> {
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
MapBuilder<String, Object> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime))
.put(Variables.FIRE_TIME, formatDate(fireTime))
.put(Variables.EXECUTION_TIME, formatDate(executionTime));

View File

@ -229,13 +229,13 @@ public final class WatcherUtils {
return builder.endObject();
}
public static Map<String, String> flattenModel(Map<String, Object> map) {
Map<String, String> result = new HashMap<>();
public static Map<String, Object> flattenModel(Map<String, Object> map) {
Map<String, Object> result = new HashMap<>();
flattenModel("", map, result);
return result;
}
private static void flattenModel(String key, Object value, Map<String, String> result) {
private static void flattenModel(String key, Object value, Map<String, Object> result) {
if (value instanceof Map) {
for (Map.Entry<String, Object> entry : ((Map<String, Object>) value).entrySet()) {
if ("".equals(key)) {

View File

@ -5,13 +5,12 @@
*/
package org.elasticsearch.watcher.support.init.proxy;
import org.elasticsearch.watcher.support.init.InitializingService;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.watcher.support.init.InitializingService;
import java.util.Map;
@ -41,10 +40,6 @@ public class ScriptServiceProxy implements InitializingService.Initializable {
return service.executable(lang, script, scriptType, vars);
}
public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, Map<String, Object> vars) {
return service.search(compiledScript, lookup, vars);
}
public SearchScript search(SearchLookup lookup, String lang, String script, ScriptService.ScriptType scriptType, Map<String, Object> vars) {
return service.search(lookup, lang, script, scriptType, vars);
}

View File

@ -98,7 +98,7 @@ public class SearchTransform extends Transform<SearchTransform.Result> {
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, createCtxModel(ctx, payload));
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
MapBuilder<String, Object> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.putAll(flattenModel(createCtxModel(ctx, payload)));
request.templateParams(templateParams.map());
request.templateName(requestPrototype.templateName());

View File

@ -1,381 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.discovery.local;
import com.google.common.base.Objects;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.*;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.transport.TransportService;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.ClusterState.Builder;
/**
* Forked from ES core just for testing. This class is first on the classpath and will be picked
* before the one in es core jar file.
*
* The version in core has a Guice circular dependency issue with 'DiscoveryService'.
* It doesn't need it, so in this version that has been removed.
*
* The circular dependency issue only manifests when plugin.types setting is used in tests.
*/
public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery {
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
private final TransportService transportService;
private final ClusterService clusterService;
private final DiscoveryNodeService discoveryNodeService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final Version version;
private final DiscoverySettings discoverySettings;
private DiscoveryNode localNode;
private volatile boolean master = false;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<>();
private static final ConcurrentMap<ClusterName, ClusterGroup> clusterGroups = ConcurrentCollections.newConcurrentMap();
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;
this.discoverySettings = discoverySettings;
}
@Override
public void setNodeService(@Nullable NodeService nodeService) {
// nothing to do here
}
@Override
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}
@Override
protected void doStart() throws ElasticsearchException {
synchronized (clusterGroups) {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {
clusterGroup = new ClusterGroup();
clusterGroups.put(clusterName, clusterGroup);
}
logger.debug("Connected to cluster [{}]", clusterName);
this.localNode = new DiscoveryNode(settings.get("name"), DiscoveryService.generateNodeId(settings), transportService.boundAddress().publishAddress(),
discoveryNodeService.buildAttributes(), version);
clusterGroup.members().add(this);
LocalDiscovery firstMaster = null;
for (LocalDiscovery localDiscovery : clusterGroup.members()) {
if (localDiscovery.localNode().masterNode()) {
firstMaster = localDiscovery;
break;
}
}
if (firstMaster != null && firstMaster.equals(this)) {
// we are the first master (and the master)
master = true;
final LocalDiscovery master = firstMaster;
clusterService.submitStateUpdateTask("local-disco-initial_connect(master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
// remove the NO_MASTER block in this case
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock());
return ClusterState.builder(currentState).nodes(nodesBuilder).blocks(blocks).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
}
});
} else if (firstMaster != null) {
// update as fast as we can the local node state with the new metadata (so we create indices for example)
final ClusterState masterState = firstMaster.clusterService.state();
clusterService.submitStateUpdateTask("local-disco(detected_master)", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// make sure we have the local node id set, we might need it as a result of the new metadata
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes()).put(localNode).localNodeId(localNode.id());
return ClusterState.builder(currentState).metaData(masterState.metaData()).nodes(nodesBuilder).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
// tell the master to send the fact that we are here
final LocalDiscovery master = firstMaster;
firstMaster.clusterService.submitStateUpdateTask("local-disco-receive(from node[" + localNode + "])", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
for (LocalDiscovery discovery : clusterGroups.get(clusterName).members()) {
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
}
});
}
} // else, no master node, the next node that will start will fill things in...
}
@Override
protected void doStop() throws ElasticsearchException {
synchronized (clusterGroups) {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {
logger.warn("Illegal state, should not have an empty cluster group when stopping, I should be there at teh very least...");
return;
}
clusterGroup.members().remove(this);
if (clusterGroup.members().isEmpty()) {
// no more members, remove and return
clusterGroups.remove(clusterName);
return;
}
LocalDiscovery firstMaster = null;
for (LocalDiscovery localDiscovery : clusterGroup.members()) {
if (localDiscovery.localNode().masterNode()) {
firstMaster = localDiscovery;
break;
}
}
if (firstMaster != null) {
// if the removed node is the master, make the next one as the master
if (master) {
firstMaster.master = true;
}
final Set<String> newMembers = new HashSet<>();
for (LocalDiscovery discovery : clusterGroup.members()) {
newMembers.add(discovery.localNode.id());
}
final LocalDiscovery master = firstMaster;
master.clusterService.submitStateUpdateTask("local-disco-update", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes newNodes = currentState.nodes().removeDeadMembers(newMembers, master.localNode.id());
DiscoveryNodes.Delta delta = newNodes.delta(currentState.nodes());
if (delta.added()) {
logger.warn("No new nodes should be created when a new discovery view is accepted");
}
// reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
}
});
}
}
}
@Override
protected void doClose() throws ElasticsearchException {
}
@Override
public DiscoveryNode localNode() {
return localNode;
}
@Override
public void addListener(InitialStateDiscoveryListener listener) {
this.initialStateListeners.add(listener);
}
@Override
public void removeListener(InitialStateDiscoveryListener listener) {
this.initialStateListeners.remove(listener);
}
@Override
public String nodeDescription() {
return clusterName.value() + "/" + localNode.id();
}
public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) {
if (!master) {
throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master");
}
LocalDiscovery[] members = members();
if (members.length > 0) {
publish(members, clusterState, new AckClusterStatePublishResponseHandler(members.length - 1, ackListener));
}
}
private LocalDiscovery[] members() {
ClusterGroup clusterGroup = clusterGroups.get(clusterName);
if (clusterGroup == null) {
return NO_MEMBERS;
}
Queue<LocalDiscovery> members = clusterGroup.members();
return members.toArray(new LocalDiscovery[members.size()]);
}
private void publish(LocalDiscovery[] members, ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {
try {
// we do the marshaling intentionally, to check it works well...
final byte[] clusterStateBytes = Builder.toBytes(clusterState);
for (final LocalDiscovery discovery : members) {
if (discovery.master) {
continue;
}
final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode, clusterName);
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock()) : "received a cluster state with a master block";
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
}
if (currentState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId());
return nodeSpecificClusterState;
}
ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
if (nodeSpecificClusterState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
}
return builder.build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
publishResponseHandler.onFailure(discovery.localNode, t);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
publishResponseHandler.onResponse(discovery.localNode);
}
});
} else {
publishResponseHandler.onResponse(discovery.localNode);
}
}
TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
try {
boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);
if (!awaited) {
logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
} catch (Exception e) {
// failure to marshal or un-marshal
throw new ElasticsearchIllegalStateException("Cluster state failed to serialize", e);
}
}
private void sendInitialStateEventIfNeeded() {
if (initialStateSent.compareAndSet(false, true)) {
for (InitialStateDiscoveryListener listener : initialStateListeners) {
listener.initialStateProcessed();
}
}
}
private class ClusterGroup {
private Queue<LocalDiscovery> members = ConcurrentCollections.newQueue();
Queue<LocalDiscovery> members() {
return members;
}
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.watcher.condition.script;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.support.Script;
@ -47,13 +48,16 @@ public class ScriptConditionSearchTests extends AbstractWatcherSingleNodeTests {
private ScriptServiceProxy scriptService;
@Before
public void init() {
public void init() throws Exception {
tp = new ThreadPool(ThreadPool.Names.SAME);
Settings settings = ImmutableSettings.settingsBuilder().build();
Settings settings = ImmutableSettings.settingsBuilder()
.put(ScriptService.DISABLE_DYNAMIC_SCRIPTING_SETTING, "none")
.build();
GroovyScriptEngineService groovyScriptEngineService = new GroovyScriptEngineService(settings);
Set<ScriptEngineService> engineServiceSet = new HashSet<>();
engineServiceSet.add(groovyScriptEngineService);
scriptService = ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp)));
NodeSettingsService nodeSettingsService = new NodeSettingsService(settings);
scriptService = ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService));
}
@After

View File

@ -7,6 +7,7 @@ package org.elasticsearch.watcher.condition.script;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.watcher.watch.WatchExecutionContext;
import org.elasticsearch.watcher.watch.Payload;
@ -147,12 +148,15 @@ public class ScriptConditionTests extends ElasticsearchTestCase {
fail("expected a condition exception trying to parse an invalid condition XContent");
}
private static ScriptServiceProxy getScriptServiceProxy(ThreadPool tp) {
Settings settings = ImmutableSettings.settingsBuilder().build();
private static ScriptServiceProxy getScriptServiceProxy(ThreadPool tp) throws IOException {
Settings settings = ImmutableSettings.settingsBuilder()
.put(ScriptService.DISABLE_DYNAMIC_SCRIPTING_SETTING, "none")
.build();
GroovyScriptEngineService groovyScriptEngineService = new GroovyScriptEngineService(settings);
Set<ScriptEngineService> engineServiceSet = new HashSet<>();
engineServiceSet.add(groovyScriptEngineService);
return ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp)));
NodeSettingsService nodeSettingsService = new NodeSettingsService(settings);
return ScriptServiceProxy.of(new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp), nodeSettingsService));
}
private static XContentBuilder createConditionContent(String script, String scriptLang, ScriptService.ScriptType scriptType) throws IOException {

View File

@ -45,7 +45,8 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
.put("d", now)
.build();
Map<String, String> result = flattenModel(map);
@SuppressWarnings("unchecked")
Map<String, String> result = (Map) flattenModel(map);
assertThat(result.size(), is(9));
assertThat(result, hasEntry("a.a1.0", "0"));
assertThat(result, hasEntry("a.a1.1", "1"));
@ -99,7 +100,7 @@ public class WatcherUtilsTests extends ElasticsearchTestCase {
expectedRequest.templateName(randomAsciiOfLengthBetween(1, 5));
expectedRequest.templateType(randomFrom(ScriptService.ScriptType.values()));
if (randomBoolean()) {
Map<String, String> params = new HashMap<>();
Map<String, Object> params = new HashMap<>();
int maxParams = randomIntBetween(1, 10);
for (int i = 0; i < maxParams; i++) {
params.put(randomAsciiOfLengthBetween(1, 5), randomAsciiOfLengthBetween(1, 5));

View File

@ -10,26 +10,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.WatcherPlugin;
import org.elasticsearch.watcher.watch.WatchService;
import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.scheduler.Scheduler;
import org.elasticsearch.watcher.scheduler.SchedulerMock;
import org.elasticsearch.watcher.scheduler.schedule.Schedule;
import org.elasticsearch.watcher.scheduler.schedule.Schedules;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
@ -48,6 +28,26 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.watcher.WatcherPlugin;
import org.elasticsearch.watcher.actions.email.service.Authentication;
import org.elasticsearch.watcher.actions.email.service.Email;
import org.elasticsearch.watcher.actions.email.service.EmailService;
import org.elasticsearch.watcher.actions.email.service.Profile;
import org.elasticsearch.watcher.actions.webhook.HttpClient;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.scheduler.Scheduler;
import org.elasticsearch.watcher.scheduler.SchedulerMock;
import org.elasticsearch.watcher.scheduler.schedule.Schedule;
import org.elasticsearch.watcher.scheduler.schedule.Schedules;
import org.elasticsearch.watcher.support.WatcherUtils;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.watcher.support.template.Template;
import org.elasticsearch.watcher.transport.actions.stats.WatcherStatsResponse;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchService;
import org.junit.After;
import org.junit.Before;
@ -352,6 +352,7 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return (InternalTestCluster) ((WatcherWrappingCluster) cluster()).testCluster;
}
// We need this custom impl, because we have custom wipe logic. We don't want the watcher index templates to get deleted between tests
private final class WatcherWrappingCluster extends TestCluster {
private final TestCluster testCluster;
@ -411,11 +412,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
return testCluster.numDataAndMasterNodes();
}
@Override
public int numBenchNodes() {
return testCluster.numBenchNodes();
}
@Override
public InetSocketAddress[] httpAddresses() {
return testCluster.httpAddresses();
@ -423,45 +419,6 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
@Override
public void close() throws IOException {
InternalTestCluster _testCluster = (InternalTestCluster) testCluster;
Set<String> nodes = new HashSet<>(Arrays.asList(_testCluster.getNodeNames()));
String masterNode = _testCluster.getMasterName();
nodes.remove(masterNode);
// First manually stop watcher on non elected master node, this will prevent that watcher becomes active
// on these nodes
for (String node : nodes) {
WatchService watchService = _testCluster.getInstance(WatchService.class, node);
assertThat(watchService.state(), equalTo(WatchService.State.STOPPED));
watchService.stop(); // Prevents these nodes from starting watcher when new elected master node is picked.
}
// Then stop watcher on elected master node and wait until watcher has stopped on it.
final WatchService watchService = _testCluster.getInstance(WatchService.class, masterNode);
try {
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(watchService.state(), not(equalTo(WatchService.State.STARTING)));
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
watchService.stop();
try {
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(watchService.state(), equalTo(WatchService.State.STOPPED));
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
// Now when can close nodes, without watcher trying to become active while nodes briefly become master
// during cluster shutdown.
testCluster.close();
}

View File

@ -5,17 +5,12 @@
*/
package org.elasticsearch.watcher.test;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.elasticsearch.watcher.WatcherLifeCycleService;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -23,7 +18,6 @@ import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
/**
*
@ -53,18 +47,6 @@ public abstract class AbstractWatcherSingleNodeTests extends ElasticsearchSingle
return client().prepareIndex(index, type, id).setSource(doc).get();
}
protected ClusterHealthStatus ensureGreen(String... indices) {
ClusterHealthResponse actionGet = client().admin().cluster()
.health(Requests.clusterHealthRequest(indices).timeout(TimeValue.timeValueSeconds(30)).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
if (actionGet.isTimedOut()) {
logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint());
assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false));
}
assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN));
logger.debug("indices {} are green", indices.length == 0 ? "[_all]" : indices);
return actionGet.getStatus();
}
protected RefreshResponse refresh() {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().execute().actionGet();
assertNoFailures(actionGet);