diff --git a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java index 29936d99910..65cb1ce771d 100644 --- a/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java +++ b/src/main/java/org/elasticsearch/watcher/WatcherLifeCycleService.java @@ -65,7 +65,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } @Override - public void clusterChanged(final ClusterChangedEvent event) { + public void clusterChanged(ClusterChangedEvent event) { if (!event.localNodeMaster()) { // We're no longer the master so we need to stop the watcher. // Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown, @@ -83,11 +83,39 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste // a .watch_history index, but they may not have been restored from the cluster state on disk return; } + + final ClusterState state = event.state(); + if (!watchService.validate(state)) { + return; + } + if (watchService.state() == WatchService.State.STOPPED && !manuallyStopped) { threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { @Override public void run() { - start(event.state()); + int attempts = 0; + while(true) { + try { + start(state); + return; + } catch (Exception e) { + if (++attempts < 3) { + logger.warn("error occurred while starting, retrying...", e); + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (!clusterService.localNode().masterNode()) { + logger.error("abort retry, we are no longer master"); + return; + } + } else { + logger.error("attempted to start Watcher [{}] times, aborting now, please try to start Watcher manually", attempts); + return; + } + } + } } }); } diff --git a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java index 0bfc4a472b4..d38c9d85ba0 100644 --- a/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java +++ b/src/main/java/org/elasticsearch/watcher/execution/ExecutionService.java @@ -8,10 +8,8 @@ package org.elasticsearch.watcher.execution; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; @@ -23,7 +21,6 @@ import org.elasticsearch.watcher.condition.Condition; import org.elasticsearch.watcher.history.HistoryStore; import org.elasticsearch.watcher.history.WatchRecord; import org.elasticsearch.watcher.input.Input; -import org.elasticsearch.watcher.support.Callback; import org.elasticsearch.watcher.support.clock.Clock; import org.elasticsearch.watcher.throttle.Throttler; import org.elasticsearch.watcher.transform.ExecutableTransform; @@ -40,7 +37,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; /** @@ -55,7 +51,6 @@ public class ExecutionService extends AbstractComponent { private final Clock clock; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicInteger initializationRetries = new AtomicInteger(); @Inject public ExecutionService(Settings settings, HistoryStore historyStore, WatchExecutor executor, WatchStore watchStore, @@ -69,25 +64,23 @@ public class ExecutionService extends AbstractComponent { this.clock = clock; } - public void start(ClusterState state, Callback callback) { + public void start(ClusterState state) { if (started.get()) { - callback.onSuccess(state); return; } assert executor.queue().isEmpty() : "queue should be empty, but contains " + executor.queue().size() + " elements."; Collection records = historyStore.loadRecords(state, WatchRecord.State.AWAITS_EXECUTION); - if (records == null) { - retry(callback); - return; - } if (started.compareAndSet(false, true)) { logger.debug("starting execution service"); historyStore.start(); executeRecords(records); logger.debug("started execution service"); } - callback.onSuccess(state); + } + + public boolean validate(ClusterState state) { + return historyStore.validate(state); } public void stop() { @@ -299,32 +292,6 @@ public class ExecutionService extends AbstractComponent { logger.debug("executed [{}] watches from the watch history", counter); } - private void retry(final Callback callback) { - ClusterStateListener clusterStateListener = new ClusterStateListener() { - - @Override - public void clusterChanged(final ClusterChangedEvent event) { - // Remove listener, so that it doesn't get called on the next cluster state update: - assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time"; - clusterService.remove(this); - // We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread. - executor.execute(new Runnable() { - - @Override - public void run() { - try { - start(event.state(), callback); - } catch (Exception e) { - callback.onFailure(e); - } - } - }); - } - }; - assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time"; - clusterService.add(clusterStateListener); - } - private final class WatchExecutionTask implements Runnable { private final WatchRecord watchRecord; diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryException.java b/src/main/java/org/elasticsearch/watcher/history/HistoryException.java index a92c3f5b55f..4f932bd5b58 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryException.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryException.java @@ -11,8 +11,8 @@ import org.elasticsearch.watcher.WatcherException; */ public class HistoryException extends WatcherException { - public HistoryException(String msg) { - super(msg); + public HistoryException(String msg, Object... args) { + super(msg, args); } public HistoryException(String msg, Throwable cause) { diff --git a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java index 705ba58b279..29c3cdcf17a 100644 --- a/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/watcher/history/HistoryStore.java @@ -79,6 +79,26 @@ public class HistoryStore extends AbstractComponent { started.set(true); } + public boolean validate(ClusterState state) { + String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*"); + if (indices.length == 0) { + logger.debug("no history indices exist, so we can load"); + return true; + } + + for (String index : indices) { + IndexMetaData indexMetaData = state.getMetaData().index(index); + if (indexMetaData != null) { + if (!state.routingTable().index(index).allPrimaryShardsActive()) { + logger.debug("not all primary shards of the [{}] index are started, so we cannot load watcher records", index); + return false; + } + } + } + + return true; + } + public void stop() { stopLock.lock(); //This will block while put or update actions are underway try { @@ -223,7 +243,7 @@ public class HistoryStore extends AbstractComponent { public Collection loadRecords(ClusterState state, WatchRecord.State recordState) { String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*"); if (indices.length == 0) { - logger.debug("No .watch_history indices found. skipping loading awaiting watch records"); + logger.debug("no .watch_history indices found. skipping loading awaiting watch records"); templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE_NAME); return Collections.emptySet(); } @@ -232,8 +252,8 @@ public class HistoryStore extends AbstractComponent { IndexMetaData indexMetaData = state.getMetaData().index(index); if (indexMetaData != null) { if (!state.routingTable().index(index).allPrimaryShardsActive()) { - logger.debug("Not all primary shards of the [{}] index are started. Schedule to retry loading awaiting watch records..", index); - return null; + logger.debug("not all primary shards of the [{}] index are started.", index); + throw new HistoryException("not all primary shards of the [{}] index are started.", index); } else { numPrimaryShards += indexMetaData.numberOfShards(); } @@ -242,7 +262,7 @@ public class HistoryStore extends AbstractComponent { RefreshResponse refreshResponse = client.refresh(new RefreshRequest(INDEX_PREFIX + "*")); if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { - return null; + throw new HistoryException("refresh was supposed to run on [{}] shards, but ran on [{}] shards", numPrimaryShards, refreshResponse.getSuccessfulShards()); } SearchRequest searchRequest = createScanSearchRequest(recordState); @@ -250,7 +270,7 @@ public class HistoryStore extends AbstractComponent { List records = new ArrayList<>(); try { if (response.getTotalShards() != response.getSuccessfulShards()) { - return null; + throw new HistoryException("scan search was supposed to run on [{}] shards, but ran on [{}] shards", numPrimaryShards, response.getSuccessfulShards()); } if (response.getHits().getTotalHits() > 0) { diff --git a/src/main/java/org/elasticsearch/watcher/support/Callback.java b/src/main/java/org/elasticsearch/watcher/support/Callback.java deleted file mode 100644 index 42ba2a59447..00000000000 --- a/src/main/java/org/elasticsearch/watcher/support/Callback.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.watcher.support; - -/** - */ -public interface Callback { - - void onSuccess(T t); - - void onFailure(Throwable e); -} diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchService.java b/src/main/java/org/elasticsearch/watcher/watch/WatchService.java index 3143adb184c..84e5c05267f 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchService.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchService.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.watcher.WatcherException; import org.elasticsearch.watcher.execution.ExecutionService; -import org.elasticsearch.watcher.support.Callback; import org.elasticsearch.watcher.trigger.TriggerService; import java.io.IOException; @@ -46,34 +45,18 @@ public class WatchService extends AbstractComponent { watchLockService.start(); // Try to load watch store before the execution service, b/c action depends on watch store - watchStore.start(clusterState, new Callback() { - - @Override - public void onSuccess(ClusterState clusterState) { - executionService.start(clusterState, new Callback() { - - @Override - public void onSuccess(ClusterState clusterState) { - triggerService.start(watchStore.watches().values()); - state.set(State.STARTED); - logger.info("watch service has started"); - } - - @Override - public void onFailure(Throwable e) { - logger.error("failed to start watch service", e); - } - }); - } - - @Override - public void onFailure(Throwable e) { - logger.error("failed to start watch service", e); - } - }); + watchStore.start(clusterState); + executionService.start(clusterState); + triggerService.start(watchStore.watches().values()); + state.set(State.STARTED); + logger.info("watch service has started"); } } + public boolean validate(ClusterState state) { + return watchStore.validate(state) && executionService.validate(state); + } + public void stop() { if (state.compareAndSet(State.STARTED, State.STOPPING)) { logger.info("stopping watch service..."); diff --git a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java index fcedcca03de..a2df1137333 100644 --- a/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java +++ b/src/main/java/org/elasticsearch/watcher/watch/WatchStore.java @@ -16,10 +16,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; @@ -30,16 +27,13 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.WatcherException; -import org.elasticsearch.watcher.support.Callback; import org.elasticsearch.watcher.support.TemplateUtils; import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import java.io.IOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -52,66 +46,61 @@ public class WatchStore extends AbstractComponent { private final ClientProxy client; private final TemplateUtils templateUtils; private final Watch.Parser watchParser; - private final ClusterService clusterService; - private final ThreadPool threadPool; private final ConcurrentMap watches; private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicInteger initializationRetries = new AtomicInteger(); private final int scrollSize; private final TimeValue scrollTimeout; @Inject - public WatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Watch.Parser watchParser, - ClusterService clusterService, ThreadPool threadPool) { + public WatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Watch.Parser watchParser) { super(settings); this.client = client; this.templateUtils = templateUtils; this.watchParser = watchParser; - this.clusterService = clusterService; - this.threadPool = threadPool; this.watches = ConcurrentCollections.newConcurrentMap(); this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollSize = componentSettings.getAsInt("scroll.size", 100); } - public void start(ClusterState state, Callback callback) { + public void start(ClusterState state) { if (started.get()) { - callback.onSuccess(state); + logger.debug("watch store already started"); return; } IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); - if (watchesIndexMetaData == null) { - logger.trace("watches index [{}] was not found. skipping loading watches...", INDEX); - templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); - started.set(true); - callback.onSuccess(state); - return; - } - - if (state.routingTable().index(INDEX).allPrimaryShardsActive()) { - logger.debug("watches index [{}] found with all active primary shards. loading watches...", INDEX); + if (watchesIndexMetaData != null) { try { int count = loadWatches(watchesIndexMetaData.numberOfShards()); logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); + templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); + started.set(true); } catch (Exception e) { - logger.debug("failed to load watches for watch index [{}]. scheduled to retry watches loading...", e, INDEX); + logger.debug("failed to load watches for watch index [{}]", e, INDEX); watches.clear(); - retry(callback); - return; } + } else { templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE); started.set(true); - callback.onSuccess(state); - } else { - logger.debug("not all primary shards of the watches index [{}] are started. scheduled to retry loading watches...", INDEX); - retry(callback); } } + public boolean validate(ClusterState state) { + IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); + if (watchesIndexMetaData == null) { + logger.debug("watches index [{}] doesn't exist, so we can start", INDEX); + return true; + } + if (state.routingTable().index(INDEX).allPrimaryShardsActive()) { + logger.debug("watches index [{}] exists and all primary shards are started, so we can start", INDEX); + return true; + } + return false; + } + public boolean started() { return started.get(); } @@ -197,36 +186,6 @@ public class WatchStore extends AbstractComponent { return indexRequest; } - private void retry(final Callback callback) { - ClusterStateListener clusterStateListener = new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - final ClusterState state = event.state(); - IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); - if (watchesIndexMetaData != null) { - if (state.routingTable().index(INDEX).allPrimaryShardsActive()) { - // Remove listener, so that it doesn't get called on the next cluster state update: - assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time"; - clusterService.remove(this); - // We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread. - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { - @Override - public void run() { - try { - start(state, callback); - } catch (Exception e) { - callback.onFailure(e); - } - } - }); - } - } - } - }; - clusterService.add(clusterStateListener); - assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time"; - } - /** * scrolls all the watch documents in the watches index, parses them, and loads them into * the given map. diff --git a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java index 00db34399f0..910cfbb970e 100644 --- a/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java +++ b/src/test/java/org/elasticsearch/watcher/WatcherLifeCycleServiceTest.java @@ -51,6 +51,7 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase { ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster")) .nodes(nodes).build(); when(watchService.state()).thenReturn(WatchService.State.STOPPED); + when(watchService.validate(clusterState)).thenReturn(true); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); verify(watchService, times(1)).start(clusterState); verify(watchService, never()).stop(); @@ -118,6 +119,7 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase { nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1"); clusterState = ClusterState.builder(new ClusterName("my-cluster")) .nodes(nodes).build(); + when(watchService.validate(clusterState)).thenReturn(true); when(watchService.state()).thenReturn(WatchService.State.STOPPED); lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState)); verify(watchService, times(3)).start(any(ClusterState.class)); diff --git a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java index 9a2d881b143..09cf2925e85 100644 --- a/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/history/HistoryStoreTests.java @@ -44,10 +44,8 @@ import java.util.Collection; import static org.elasticsearch.common.joda.time.DateTimeZone.UTC; import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.*; import static org.hamcrest.core.IsEqual.equalTo; -import static org.hamcrest.core.IsNull.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @@ -157,6 +155,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { csBuilder.metaData(metaDateBuilder); ClusterState cs = csBuilder.build(); + assertThat(historyStore.validate(cs), is(true)); Collection records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); assertThat(records, notNullValue()); assertThat(records, hasSize(0)); @@ -196,8 +195,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase { csBuilder.routingTable(routingTableBuilder); ClusterState cs = csBuilder.build(); - Collection records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); - assertThat(records, nullValue()); + assertThat(historyStore.validate(cs), is(false)); + try { + historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); + fail("exception expected, because not all primary shards are started"); + } catch (HistoryException e) { + assertThat(e.getMessage(), containsString("not all primary shards of the [.watch_history_")); + } verifyZeroInteractions(templateUtils); verifyZeroInteractions(clientProxy); @@ -225,10 +229,15 @@ public class HistoryStoreTests extends ElasticsearchTestCase { csBuilder.routingTable(routingTableBuilder); ClusterState cs = csBuilder.build(); + assertThat(historyStore.validate(cs), is(true)); RefreshResponse refreshResponse = mockRefreshResponse(1, 0); when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); - Collection records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); - assertThat(records, nullValue()); + try { + historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); + fail("exception expected, because refresh did't manage to run on all primary shards"); + } catch (HistoryException e) { + assertThat(e.getMessage(), equalTo("refresh was supposed to run on [1] shards, but ran on [0] shards")); + } verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); @@ -266,9 +275,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase { when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1)); - Collection records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); - assertThat(records, nullValue()); - + assertThat(historyStore.validate(cs), is(true)); + try { + historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); + fail("exception expected, because scan search didn't manage to run on all shards"); + } catch (HistoryException e) { + assertThat(e.getMessage(), equalTo("scan search was supposed to run on [1] shards, but ran on [0] shards")); + } verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); } @@ -306,6 +319,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1)); + assertThat(historyStore.validate(cs), is(true)); Collection records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); assertThat(records, IsNull.notNullValue()); assertThat(records, hasSize(0)); @@ -363,6 +377,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase { when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1)); + assertThat(historyStore.validate(cs), is(true)); Collection records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION); assertThat(records, notNullValue()); assertThat(records, hasSize(0)); diff --git a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java index 0b9c1e22690..90e2c07303a 100644 --- a/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java +++ b/src/test/java/org/elasticsearch/watcher/watch/WatchStoreTests.java @@ -7,14 +7,11 @@ package org.elasticsearch.watcher.watch; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.search.*; -import org.elasticsearch.watcher.support.Callback; -import org.elasticsearch.watcher.support.TemplateUtils; -import org.elasticsearch.watcher.support.init.proxy.ClientProxy; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.*; @@ -29,7 +26,8 @@ import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.test.ElasticsearchTestCase; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.support.TemplateUtils; +import org.elasticsearch.watcher.support.init.proxy.ClientProxy; import org.junit.Before; import org.junit.Test; @@ -48,19 +46,13 @@ public class WatchStoreTests extends ElasticsearchTestCase { private ClientProxy clientProxy; private TemplateUtils templateUtils; private Watch.Parser parser; - private ClusterService clusterService; - private ThreadPool threadPool; - private Callback callback; @Before public void init() { clientProxy = mock(ClientProxy.class); templateUtils = mock(TemplateUtils.class); parser = mock(Watch.Parser.class); - clusterService = mock(ClusterService.class); - threadPool = mock(ThreadPool.class); - callback = mock(Callback.class); - watchStore = new WatchStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser, clusterService, threadPool); + watchStore = new WatchStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser); } @Test @@ -70,21 +62,16 @@ public class WatchStoreTests extends ElasticsearchTestCase { csBuilder.metaData(metaDateBuilder); ClusterState cs = csBuilder.build(); - watchStore.start(cs, callback); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(callback, times(1)).onSuccess(any(ClusterState.class)); - verify(callback, never()).onFailure(any(Throwable.class)); verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); verifyZeroInteractions(clientProxy); - verifyZeroInteractions(clusterService); - watchStore.start(cs, callback); - verify(callback, times(2)).onSuccess(any(ClusterState.class)); - verify(callback, never()).onFailure(any(Throwable.class)); + watchStore.start(cs); verifyNoMoreInteractions(templateUtils); verifyZeroInteractions(clientProxy); - verifyZeroInteractions(clusterService); } @Test @@ -107,10 +94,7 @@ public class WatchStoreTests extends ElasticsearchTestCase { csBuilder.routingTable(routingTableBuilder); ClusterState cs = csBuilder.build(); - - watchStore.start(cs, callback); - verify(clusterService, timeout(1)).add(any(ClusterStateListener.class)); - verifyZeroInteractions(callback); + assertThat(watchStore.validate(cs), is(false)); verifyZeroInteractions(templateUtils); verifyZeroInteractions(clientProxy); } @@ -139,9 +123,8 @@ public class WatchStoreTests extends ElasticsearchTestCase { ClusterState cs = csBuilder.build(); - watchStore.start(cs, callback); - verify(clusterService, timeout(1)).add(any(ClusterStateListener.class)); - verifyZeroInteractions(callback); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, never()).search(any(SearchRequest.class)); @@ -176,10 +159,8 @@ public class WatchStoreTests extends ElasticsearchTestCase { when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); ClusterState cs = csBuilder.build(); - - watchStore.start(cs, callback); - verify(clusterService, timeout(1)).add(any(ClusterStateListener.class)); - verifyZeroInteractions(callback); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); verifyZeroInteractions(templateUtils); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class)); @@ -214,13 +195,10 @@ public class WatchStoreTests extends ElasticsearchTestCase { when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); ClusterState cs = csBuilder.build(); - - watchStore.start(cs, callback); - verifyZeroInteractions(clusterService); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(0)); - verify(callback, times(1)).onSuccess(any(ClusterState.class)); - verify(callback, never()).onFailure(any(Throwable.class)); verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class)); @@ -272,13 +250,10 @@ public class WatchStoreTests extends ElasticsearchTestCase { when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); ClusterState cs = csBuilder.build(); - - watchStore.start(cs, callback); - verifyZeroInteractions(clusterService); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); assertThat(watchStore.started(), is(true)); assertThat(watchStore.watches().size(), equalTo(2)); - verify(callback, times(1)).onSuccess(any(ClusterState.class)); - verify(callback, never()).onFailure(any(Throwable.class)); verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches"); verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); verify(clientProxy, times(1)).search(any(SearchRequest.class));