When Watcher starts verify that all index templates have been added, otherwise add the index templates
There maybe a small window of time just after Watcher has started that the index templates don't exist. Original commit: elastic/x-pack-elasticsearch@9af3018be4
This commit is contained in:
parent
01fd8496c6
commit
28e384d104
|
@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||||
import org.elasticsearch.watcher.execution.ExecutionService;
|
import org.elasticsearch.watcher.execution.ExecutionService;
|
||||||
|
import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry;
|
||||||
import org.elasticsearch.watcher.support.clock.Clock;
|
import org.elasticsearch.watcher.support.clock.Clock;
|
||||||
import org.elasticsearch.watcher.trigger.TriggerService;
|
import org.elasticsearch.watcher.trigger.TriggerService;
|
||||||
import org.elasticsearch.watcher.watch.Watch;
|
import org.elasticsearch.watcher.watch.Watch;
|
||||||
|
@ -39,11 +40,12 @@ public class WatcherService extends AbstractComponent {
|
||||||
private final WatchStore watchStore;
|
private final WatchStore watchStore;
|
||||||
private final WatchLockService watchLockService;
|
private final WatchLockService watchLockService;
|
||||||
private final ExecutionService executionService;
|
private final ExecutionService executionService;
|
||||||
|
private final WatcherIndexTemplateRegistry watcherIndexTemplateRegistry;
|
||||||
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STOPPED);
|
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STOPPED);
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public WatcherService(Settings settings, Clock clock, TriggerService triggerService, WatchStore watchStore,
|
public WatcherService(Settings settings, Clock clock, TriggerService triggerService, WatchStore watchStore,
|
||||||
Watch.Parser watchParser, ExecutionService executionService, WatchLockService watchLockService) {
|
Watch.Parser watchParser, ExecutionService executionService, WatchLockService watchLockService, WatcherIndexTemplateRegistry watcherIndexTemplateRegistry) {
|
||||||
super(settings);
|
super(settings);
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
this.triggerService = triggerService;
|
this.triggerService = triggerService;
|
||||||
|
@ -51,12 +53,14 @@ public class WatcherService extends AbstractComponent {
|
||||||
this.watchParser = watchParser;
|
this.watchParser = watchParser;
|
||||||
this.watchLockService = watchLockService;
|
this.watchLockService = watchLockService;
|
||||||
this.executionService = executionService;
|
this.executionService = executionService;
|
||||||
|
this.watcherIndexTemplateRegistry = watcherIndexTemplateRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(ClusterState clusterState) throws Exception {
|
public void start(ClusterState clusterState) throws Exception {
|
||||||
if (state.compareAndSet(WatcherState.STOPPED, WatcherState.STARTING)) {
|
if (state.compareAndSet(WatcherState.STOPPED, WatcherState.STARTING)) {
|
||||||
try {
|
try {
|
||||||
logger.info("starting watch service...");
|
logger.info("starting watch service...");
|
||||||
|
watcherIndexTemplateRegistry.addTemplatesIfMissing();
|
||||||
watchLockService.start();
|
watchLockService.start();
|
||||||
|
|
||||||
// Try to load watch store before the execution service, b/c action depends on watch store
|
// Try to load watch store before the execution service, b/c action depends on watch store
|
||||||
|
|
|
@ -80,9 +80,23 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
putTemplatesIfMissing(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the registered index templates if missing to the cluster.
|
||||||
|
*/
|
||||||
|
public void addTemplatesIfMissing() {
|
||||||
|
putTemplatesIfMissing(clusterService.state());
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void putTemplatesIfMissing(ClusterState state) {
|
||||||
for (TemplateConfig template : indexTemplates) {
|
for (TemplateConfig template : indexTemplates) {
|
||||||
if (!state.metaData().getTemplates().containsKey(template.getTemplateName())) {
|
if (!state.metaData().getTemplates().containsKey(template.getTemplateName())) {
|
||||||
|
logger.debug("adding index template [{}], because it doesn't exist", template.getTemplateName());
|
||||||
putTemplate(template);
|
putTemplate(template);
|
||||||
|
} else {
|
||||||
|
logger.trace("not adding index template [{}], because it already exists", template.getTemplateName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
package org.elasticsearch.watcher.test;
|
package org.elasticsearch.watcher.test;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.search.SearchType;
|
import org.elasticsearch.action.search.SearchType;
|
||||||
|
@ -30,10 +31,7 @@ import org.elasticsearch.shield.crypto.InternalCryptoService;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||||
import org.elasticsearch.test.TestCluster;
|
import org.elasticsearch.test.TestCluster;
|
||||||
import org.elasticsearch.watcher.WatcherLifeCycleService;
|
import org.elasticsearch.watcher.*;
|
||||||
import org.elasticsearch.watcher.WatcherPlugin;
|
|
||||||
import org.elasticsearch.watcher.WatcherService;
|
|
||||||
import org.elasticsearch.watcher.WatcherState;
|
|
||||||
import org.elasticsearch.watcher.actions.email.service.Authentication;
|
import org.elasticsearch.watcher.actions.email.service.Authentication;
|
||||||
import org.elasticsearch.watcher.actions.email.service.Email;
|
import org.elasticsearch.watcher.actions.email.service.Email;
|
||||||
import org.elasticsearch.watcher.actions.email.service.EmailService;
|
import org.elasticsearch.watcher.actions.email.service.EmailService;
|
||||||
|
@ -71,6 +69,7 @@ import java.util.Map;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
|
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
|
||||||
|
import static org.elasticsearch.watcher.WatcherModule.*;
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
import static org.hamcrest.core.Is.is;
|
import static org.hamcrest.core.Is.is;
|
||||||
import static org.hamcrest.core.IsNot.not;
|
import static org.hamcrest.core.IsNot.not;
|
||||||
|
@ -433,6 +432,11 @@ public abstract class AbstractWatcherIntegrationTests extends ElasticsearchInteg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
// Verify that the index templates exist:
|
||||||
|
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates(
|
||||||
|
HISTORY_TEMPLATE_NAME, TRIGGERED_TEMPLATE_NAME, WATCHES_TEMPLATE_NAME
|
||||||
|
).get();
|
||||||
|
assertThat(response.getIndexTemplates().size(), equalTo(3));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void ensureLicenseEnabled() throws Exception {
|
protected void ensureLicenseEnabled() throws Exception {
|
||||||
|
|
|
@ -19,6 +19,7 @@ import org.elasticsearch.watcher.WatcherService;
|
||||||
import org.elasticsearch.watcher.WatcherState;
|
import org.elasticsearch.watcher.WatcherState;
|
||||||
import org.elasticsearch.watcher.actions.ActionStatus;
|
import org.elasticsearch.watcher.actions.ActionStatus;
|
||||||
import org.elasticsearch.watcher.execution.ExecutionService;
|
import org.elasticsearch.watcher.execution.ExecutionService;
|
||||||
|
import org.elasticsearch.watcher.support.WatcherIndexTemplateRegistry;
|
||||||
import org.elasticsearch.watcher.support.clock.ClockMock;
|
import org.elasticsearch.watcher.support.clock.ClockMock;
|
||||||
import org.elasticsearch.watcher.support.clock.SystemClock;
|
import org.elasticsearch.watcher.support.clock.SystemClock;
|
||||||
import org.elasticsearch.watcher.trigger.Trigger;
|
import org.elasticsearch.watcher.trigger.Trigger;
|
||||||
|
@ -45,7 +46,6 @@ public class WatchServiceTests extends ElasticsearchTestCase {
|
||||||
private WatchStore watchStore;
|
private WatchStore watchStore;
|
||||||
private Watch.Parser watchParser;
|
private Watch.Parser watchParser;
|
||||||
private WatcherService watcherService;
|
private WatcherService watcherService;
|
||||||
private ExecutionService executionService;
|
|
||||||
private WatchLockService watchLockService;
|
private WatchLockService watchLockService;
|
||||||
private ClockMock clock;
|
private ClockMock clock;
|
||||||
|
|
||||||
|
@ -54,10 +54,11 @@ public class WatchServiceTests extends ElasticsearchTestCase {
|
||||||
triggerService = mock(TriggerService.class);
|
triggerService = mock(TriggerService.class);
|
||||||
watchStore = mock(WatchStore.class);
|
watchStore = mock(WatchStore.class);
|
||||||
watchParser = mock(Watch.Parser.class);
|
watchParser = mock(Watch.Parser.class);
|
||||||
executionService = mock(ExecutionService.class);
|
ExecutionService executionService = mock(ExecutionService.class);
|
||||||
watchLockService = mock(WatchLockService.class);
|
watchLockService = mock(WatchLockService.class);
|
||||||
clock = new ClockMock();
|
clock = new ClockMock();
|
||||||
watcherService = new WatcherService(Settings.EMPTY, clock, triggerService, watchStore, watchParser, executionService, watchLockService);
|
WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = mock(WatcherIndexTemplateRegistry.class);
|
||||||
|
watcherService = new WatcherService(Settings.EMPTY, clock, triggerService, watchStore, watchParser, executionService, watchLockService, watcherIndexTemplateRegistry);
|
||||||
Field field = WatcherService.class.getDeclaredField("state");
|
Field field = WatcherService.class.getDeclaredField("state");
|
||||||
field.setAccessible(true);
|
field.setAccessible(true);
|
||||||
AtomicReference<WatcherState> state = (AtomicReference<WatcherState>) field.get(watcherService);
|
AtomicReference<WatcherState> state = (AtomicReference<WatcherState>) field.get(watcherService);
|
||||||
|
|
Loading…
Reference in New Issue