Refactored life cycle starting.

* The validation happens separately from the actual starting and loading. Also the validation happens on the cluster update thread, so that we don't miss any cluster state update.
* The starting/loading part happens on a forked thread, but if it fails then it always retry after a small timeout.
* Simplified the starting code, so that we don't need the callback interface.

Closes elastic/elasticsearch#212

Original commit: elastic/x-pack-elasticsearch@b5cd48e5bb
This commit is contained in:
Martijn van Groningen 2015-04-23 13:39:16 +02:00
parent 41e42f0945
commit ec8c9046b3
10 changed files with 137 additions and 203 deletions

View File

@ -65,7 +65,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
public void clusterChanged(ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
// We're no longer the master so we need to stop the watcher.
// Stopping the watcher may take a while since it will wait on the scheduler to complete shutdown,
@ -83,11 +83,39 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
// a .watch_history index, but they may not have been restored from the cluster state on disk
return;
}
final ClusterState state = event.state();
if (!watchService.validate(state)) {
return;
}
if (watchService.state() == WatchService.State.STOPPED && !manuallyStopped) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
start(event.state());
int attempts = 0;
while(true) {
try {
start(state);
return;
} catch (Exception e) {
if (++attempts < 3) {
logger.warn("error occurred while starting, retrying...", e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
if (!clusterService.localNode().masterNode()) {
logger.error("abort retry, we are no longer master");
return;
}
} else {
logger.error("attempted to start Watcher [{}] times, aborting now, please try to start Watcher manually", attempts);
return;
}
}
}
}
});
}

View File

@ -8,10 +8,8 @@ package org.elasticsearch.watcher.execution;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
@ -23,7 +21,6 @@ import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.history.HistoryStore;
import org.elasticsearch.watcher.history.WatchRecord;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.support.Callback;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.throttle.Throttler;
import org.elasticsearch.watcher.transform.ExecutableTransform;
@ -40,7 +37,6 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
@ -55,7 +51,6 @@ public class ExecutionService extends AbstractComponent {
private final Clock clock;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicInteger initializationRetries = new AtomicInteger();
@Inject
public ExecutionService(Settings settings, HistoryStore historyStore, WatchExecutor executor, WatchStore watchStore,
@ -69,25 +64,23 @@ public class ExecutionService extends AbstractComponent {
this.clock = clock;
}
public void start(ClusterState state, Callback<ClusterState> callback) {
public void start(ClusterState state) {
if (started.get()) {
callback.onSuccess(state);
return;
}
assert executor.queue().isEmpty() : "queue should be empty, but contains " + executor.queue().size() + " elements.";
Collection<WatchRecord> records = historyStore.loadRecords(state, WatchRecord.State.AWAITS_EXECUTION);
if (records == null) {
retry(callback);
return;
}
if (started.compareAndSet(false, true)) {
logger.debug("starting execution service");
historyStore.start();
executeRecords(records);
logger.debug("started execution service");
}
callback.onSuccess(state);
}
public boolean validate(ClusterState state) {
return historyStore.validate(state);
}
public void stop() {
@ -299,32 +292,6 @@ public class ExecutionService extends AbstractComponent {
logger.debug("executed [{}] watches from the watch history", counter);
}
private void retry(final Callback<ClusterState> callback) {
ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(final ClusterChangedEvent event) {
// Remove listener, so that it doesn't get called on the next cluster state update:
assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time";
clusterService.remove(this);
// We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread.
executor.execute(new Runnable() {
@Override
public void run() {
try {
start(event.state(), callback);
} catch (Exception e) {
callback.onFailure(e);
}
}
});
}
};
assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time";
clusterService.add(clusterStateListener);
}
private final class WatchExecutionTask implements Runnable {
private final WatchRecord watchRecord;

View File

@ -11,8 +11,8 @@ import org.elasticsearch.watcher.WatcherException;
*/
public class HistoryException extends WatcherException {
public HistoryException(String msg) {
super(msg);
public HistoryException(String msg, Object... args) {
super(msg, args);
}
public HistoryException(String msg, Throwable cause) {

View File

@ -79,6 +79,26 @@ public class HistoryStore extends AbstractComponent {
started.set(true);
}
public boolean validate(ClusterState state) {
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.debug("no history indices exist, so we can load");
return true;
}
for (String index : indices) {
IndexMetaData indexMetaData = state.getMetaData().index(index);
if (indexMetaData != null) {
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
logger.debug("not all primary shards of the [{}] index are started, so we cannot load watcher records", index);
return false;
}
}
}
return true;
}
public void stop() {
stopLock.lock(); //This will block while put or update actions are underway
try {
@ -223,7 +243,7 @@ public class HistoryStore extends AbstractComponent {
public Collection<WatchRecord> loadRecords(ClusterState state, WatchRecord.State recordState) {
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.debug("No .watch_history indices found. skipping loading awaiting watch records");
logger.debug("no .watch_history indices found. skipping loading awaiting watch records");
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE_NAME);
return Collections.emptySet();
}
@ -232,8 +252,8 @@ public class HistoryStore extends AbstractComponent {
IndexMetaData indexMetaData = state.getMetaData().index(index);
if (indexMetaData != null) {
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
logger.debug("Not all primary shards of the [{}] index are started. Schedule to retry loading awaiting watch records..", index);
return null;
logger.debug("not all primary shards of the [{}] index are started.", index);
throw new HistoryException("not all primary shards of the [{}] index are started.", index);
} else {
numPrimaryShards += indexMetaData.numberOfShards();
}
@ -242,7 +262,7 @@ public class HistoryStore extends AbstractComponent {
RefreshResponse refreshResponse = client.refresh(new RefreshRequest(INDEX_PREFIX + "*"));
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
return null;
throw new HistoryException("refresh was supposed to run on [{}] shards, but ran on [{}] shards", numPrimaryShards, refreshResponse.getSuccessfulShards());
}
SearchRequest searchRequest = createScanSearchRequest(recordState);
@ -250,7 +270,7 @@ public class HistoryStore extends AbstractComponent {
List<WatchRecord> records = new ArrayList<>();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
return null;
throw new HistoryException("scan search was supposed to run on [{}] shards, but ran on [{}] shards", numPrimaryShards, response.getSuccessfulShards());
}
if (response.getHits().getTotalHits() > 0) {

View File

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support;
/**
*/
public interface Callback<T> {
void onSuccess(T t);
void onFailure(Throwable e);
}

View File

@ -15,7 +15,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.execution.ExecutionService;
import org.elasticsearch.watcher.support.Callback;
import org.elasticsearch.watcher.trigger.TriggerService;
import java.io.IOException;
@ -46,34 +45,18 @@ public class WatchService extends AbstractComponent {
watchLockService.start();
// Try to load watch store before the execution service, b/c action depends on watch store
watchStore.start(clusterState, new Callback<ClusterState>() {
@Override
public void onSuccess(ClusterState clusterState) {
executionService.start(clusterState, new Callback<ClusterState>() {
@Override
public void onSuccess(ClusterState clusterState) {
triggerService.start(watchStore.watches().values());
state.set(State.STARTED);
logger.info("watch service has started");
}
@Override
public void onFailure(Throwable e) {
logger.error("failed to start watch service", e);
}
});
}
@Override
public void onFailure(Throwable e) {
logger.error("failed to start watch service", e);
}
});
watchStore.start(clusterState);
executionService.start(clusterState);
triggerService.start(watchStore.watches().values());
state.set(State.STARTED);
logger.info("watch service has started");
}
}
public boolean validate(ClusterState state) {
return watchStore.validate(state) && executionService.validate(state);
}
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
logger.info("stopping watch service...");

View File

@ -16,10 +16,7 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
@ -30,16 +27,13 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.WatcherException;
import org.elasticsearch.watcher.support.Callback;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@ -52,66 +46,61 @@ public class WatchStore extends AbstractComponent {
private final ClientProxy client;
private final TemplateUtils templateUtils;
private final Watch.Parser watchParser;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final ConcurrentMap<String, Watch> watches;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicInteger initializationRetries = new AtomicInteger();
private final int scrollSize;
private final TimeValue scrollTimeout;
@Inject
public WatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Watch.Parser watchParser,
ClusterService clusterService, ThreadPool threadPool) {
public WatchStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Watch.Parser watchParser) {
super(settings);
this.client = client;
this.templateUtils = templateUtils;
this.watchParser = watchParser;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.watches = ConcurrentCollections.newConcurrentMap();
this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollSize = componentSettings.getAsInt("scroll.size", 100);
}
public void start(ClusterState state, Callback<ClusterState> callback) {
public void start(ClusterState state) {
if (started.get()) {
callback.onSuccess(state);
logger.debug("watch store already started");
return;
}
IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX);
if (watchesIndexMetaData == null) {
logger.trace("watches index [{}] was not found. skipping loading watches...", INDEX);
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE);
started.set(true);
callback.onSuccess(state);
return;
}
if (state.routingTable().index(INDEX).allPrimaryShardsActive()) {
logger.debug("watches index [{}] found with all active primary shards. loading watches...", INDEX);
if (watchesIndexMetaData != null) {
try {
int count = loadWatches(watchesIndexMetaData.numberOfShards());
logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX);
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE);
started.set(true);
} catch (Exception e) {
logger.debug("failed to load watches for watch index [{}]. scheduled to retry watches loading...", e, INDEX);
logger.debug("failed to load watches for watch index [{}]", e, INDEX);
watches.clear();
retry(callback);
return;
}
} else {
templateUtils.ensureIndexTemplateIsLoaded(state, INDEX_TEMPLATE);
started.set(true);
callback.onSuccess(state);
} else {
logger.debug("not all primary shards of the watches index [{}] are started. scheduled to retry loading watches...", INDEX);
retry(callback);
}
}
public boolean validate(ClusterState state) {
IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX);
if (watchesIndexMetaData == null) {
logger.debug("watches index [{}] doesn't exist, so we can start", INDEX);
return true;
}
if (state.routingTable().index(INDEX).allPrimaryShardsActive()) {
logger.debug("watches index [{}] exists and all primary shards are started, so we can start", INDEX);
return true;
}
return false;
}
public boolean started() {
return started.get();
}
@ -197,36 +186,6 @@ public class WatchStore extends AbstractComponent {
return indexRequest;
}
private void retry(final Callback<ClusterState> callback) {
ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final ClusterState state = event.state();
IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX);
if (watchesIndexMetaData != null) {
if (state.routingTable().index(INDEX).allPrimaryShardsActive()) {
// Remove listener, so that it doesn't get called on the next cluster state update:
assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time";
clusterService.remove(this);
// We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread.
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
try {
start(state, callback);
} catch (Exception e) {
callback.onFailure(e);
}
}
});
}
}
}
};
clusterService.add(clusterStateListener);
assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time";
}
/**
* scrolls all the watch documents in the watches index, parses them, and loads them into
* the given map.

View File

@ -51,6 +51,7 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watchService.state()).thenReturn(WatchService.State.STOPPED);
when(watchService.validate(clusterState)).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, times(1)).start(clusterState);
verify(watchService, never()).stop();
@ -118,6 +119,7 @@ public class WatcherLifeCycleServiceTest extends ElasticsearchTestCase {
nodes = new DiscoveryNodes.Builder().masterNodeId("id1").localNodeId("id1");
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes).build();
when(watchService.validate(clusterState)).thenReturn(true);
when(watchService.state()).thenReturn(WatchService.State.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watchService, times(3)).start(any(ClusterState.class));

View File

@ -44,10 +44,8 @@ import java.util.Collection;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.core.IsNull.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
@ -157,6 +155,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
csBuilder.metaData(metaDateBuilder);
ClusterState cs = csBuilder.build();
assertThat(historyStore.validate(cs), is(true));
Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, notNullValue());
assertThat(records, hasSize(0));
@ -196,8 +195,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, nullValue());
assertThat(historyStore.validate(cs), is(false));
try {
historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
fail("exception expected, because not all primary shards are started");
} catch (HistoryException e) {
assertThat(e.getMessage(), containsString("not all primary shards of the [.watch_history_"));
}
verifyZeroInteractions(templateUtils);
verifyZeroInteractions(clientProxy);
@ -225,10 +229,15 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
assertThat(historyStore.validate(cs), is(true));
RefreshResponse refreshResponse = mockRefreshResponse(1, 0);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, nullValue());
try {
historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
fail("exception expected, because refresh did't manage to run on all primary shards");
} catch (HistoryException e) {
assertThat(e.getMessage(), equalTo("refresh was supposed to run on [1] shards, but ran on [0] shards"));
}
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
@ -266,9 +275,13 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, nullValue());
assertThat(historyStore.validate(cs), is(true));
try {
historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
fail("exception expected, because scan search didn't manage to run on all shards");
} catch (HistoryException e) {
assertThat(e.getMessage(), equalTo("scan search was supposed to run on [1] shards, but ran on [0] shards"));
}
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
}
@ -306,6 +319,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
assertThat(historyStore.validate(cs), is(true));
Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, IsNull.notNullValue());
assertThat(records, hasSize(0));
@ -363,6 +377,7 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
assertThat(historyStore.validate(cs), is(true));
Collection<WatchRecord> records = historyStore.loadRecords(cs, WatchRecord.State.AWAITS_EXECUTION);
assertThat(records, notNullValue());
assertThat(records, hasSize(0));

View File

@ -7,14 +7,11 @@ package org.elasticsearch.watcher.watch;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.watcher.support.Callback;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
@ -29,7 +26,8 @@ import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.support.TemplateUtils;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.junit.Before;
import org.junit.Test;
@ -48,19 +46,13 @@ public class WatchStoreTests extends ElasticsearchTestCase {
private ClientProxy clientProxy;
private TemplateUtils templateUtils;
private Watch.Parser parser;
private ClusterService clusterService;
private ThreadPool threadPool;
private Callback<ClusterState> callback;
@Before
public void init() {
clientProxy = mock(ClientProxy.class);
templateUtils = mock(TemplateUtils.class);
parser = mock(Watch.Parser.class);
clusterService = mock(ClusterService.class);
threadPool = mock(ThreadPool.class);
callback = mock(Callback.class);
watchStore = new WatchStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser, clusterService, threadPool);
watchStore = new WatchStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser);
}
@Test
@ -70,21 +62,16 @@ public class WatchStoreTests extends ElasticsearchTestCase {
csBuilder.metaData(metaDateBuilder);
ClusterState cs = csBuilder.build();
watchStore.start(cs, callback);
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(0));
verify(callback, times(1)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches");
verifyZeroInteractions(clientProxy);
verifyZeroInteractions(clusterService);
watchStore.start(cs, callback);
verify(callback, times(2)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
watchStore.start(cs);
verifyNoMoreInteractions(templateUtils);
verifyZeroInteractions(clientProxy);
verifyZeroInteractions(clusterService);
}
@Test
@ -107,10 +94,7 @@ public class WatchStoreTests extends ElasticsearchTestCase {
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
watchStore.start(cs, callback);
verify(clusterService, timeout(1)).add(any(ClusterStateListener.class));
verifyZeroInteractions(callback);
assertThat(watchStore.validate(cs), is(false));
verifyZeroInteractions(templateUtils);
verifyZeroInteractions(clientProxy);
}
@ -139,9 +123,8 @@ public class WatchStoreTests extends ElasticsearchTestCase {
ClusterState cs = csBuilder.build();
watchStore.start(cs, callback);
verify(clusterService, timeout(1)).add(any(ClusterStateListener.class));
verifyZeroInteractions(callback);
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, never()).search(any(SearchRequest.class));
@ -176,10 +159,8 @@ public class WatchStoreTests extends ElasticsearchTestCase {
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
watchStore.start(cs, callback);
verify(clusterService, timeout(1)).add(any(ClusterStateListener.class));
verifyZeroInteractions(callback);
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
@ -214,13 +195,10 @@ public class WatchStoreTests extends ElasticsearchTestCase {
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
watchStore.start(cs, callback);
verifyZeroInteractions(clusterService);
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(0));
verify(callback, times(1)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches");
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
@ -272,13 +250,10 @@ public class WatchStoreTests extends ElasticsearchTestCase {
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
watchStore.start(cs, callback);
verifyZeroInteractions(clusterService);
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(2));
verify(callback, times(1)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "watches");
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));