Watcher: Ensure all templates exist before starting watcher (elastic/x-pack-elasticsearch#2765)
This is to ensure that the required templates exist (which are added by the WatcherIndexTemplateRegistry) before actually starting watcher. Relates elastic/x-pack-elasticsearch#2761 Original commit: elastic/x-pack-elasticsearch@568088061f
This commit is contained in:
parent
c9682d02d4
commit
6f437c973a
|
@ -21,6 +21,7 @@ import org.elasticsearch.gateway.GatewayService;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.upgrade.Upgrade;
|
||||
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
|
||||
import org.elasticsearch.xpack.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
|
||||
|
||||
|
@ -76,6 +77,13 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
|||
return;
|
||||
}
|
||||
|
||||
// ensure that templates are existing before starting watcher
|
||||
// the watcher index template registry is independent from watcher being started or stopped
|
||||
if (WatcherIndexTemplateRegistry.validate(state) == false) {
|
||||
logger.debug("not starting watcher, watcher templates are missing in the cluster state");
|
||||
return;
|
||||
}
|
||||
|
||||
if (watcherService.validate(state)) {
|
||||
logger.trace("starting... (based on cluster state version [{}]) (manual [{}])", state.getVersion(), manual);
|
||||
try {
|
||||
|
|
|
@ -127,6 +127,12 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
|||
});
|
||||
}
|
||||
|
||||
public static boolean validate(ClusterState state) {
|
||||
return state.getMetaData().getTemplates().containsKey(HISTORY_TEMPLATE_NAME) &&
|
||||
state.getMetaData().getTemplates().containsKey(TRIGGERED_TEMPLATE_NAME) &&
|
||||
state.getMetaData().getTemplates().containsKey(WATCHES_TEMPLATE_NAME);
|
||||
}
|
||||
|
||||
public static class TemplateConfig {
|
||||
|
||||
private final String templateName;
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -30,6 +31,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
|
||||
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
|
||||
import org.elasticsearch.xpack.watcher.watch.Watch;
|
||||
import org.junit.Before;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -82,6 +84,11 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
|||
|
||||
IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(new Index(Watch.INDEX, "foo")).build();
|
||||
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME))
|
||||
.build())
|
||||
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
|
||||
.routingTable(RoutingTable.builder().add(watchRoutingTable).build())
|
||||
.build();
|
||||
|
@ -114,6 +121,11 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
|||
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
|
||||
.routingTable(RoutingTable.builder().add(watchRoutingTable).build())
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME))
|
||||
.build())
|
||||
.build();
|
||||
|
||||
when(watcherService.validate(clusterState)).thenReturn(true);
|
||||
|
@ -138,6 +150,11 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
|||
// no change, keep going
|
||||
clusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME))
|
||||
.build())
|
||||
.build();
|
||||
when(watcherService.state()).thenReturn(WatcherState.STARTED);
|
||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
|
||||
|
@ -146,6 +163,11 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
|||
|
||||
ClusterState previousClusterState = ClusterState.builder(new ClusterName("my-cluster"))
|
||||
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME))
|
||||
.build())
|
||||
.build();
|
||||
when(watcherService.validate(clusterState)).thenReturn(true);
|
||||
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
|
||||
|
@ -413,7 +435,14 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
|||
.add(newNode("oldNode", VersionUtils.randomVersionBetween(random(), Version.V_5_5_0, Version.V_6_0_0_alpha2)))
|
||||
.build();
|
||||
|
||||
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
|
||||
ClusterState state = ClusterState.builder(new ClusterName("my-cluster"))
|
||||
.nodes(nodes)
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME))
|
||||
.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME))
|
||||
.build())
|
||||
.build();
|
||||
when(watcherService.validate(eq(state))).thenReturn(true);
|
||||
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
|
||||
|
||||
|
@ -421,6 +450,36 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
|||
verify(watcherService).start(any(ClusterState.class));
|
||||
}
|
||||
|
||||
public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Exception {
|
||||
DiscoveryNodes nodes = new DiscoveryNodes.Builder()
|
||||
.masterNodeId("node_1").localNodeId("node_1")
|
||||
.add(newNode("node_1"))
|
||||
.build();
|
||||
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
boolean isHistoryTemplateAdded = randomBoolean();
|
||||
if (isHistoryTemplateAdded) {
|
||||
metaDataBuilder.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME));
|
||||
}
|
||||
boolean isTriggeredTemplateAdded = randomBoolean();
|
||||
if (isTriggeredTemplateAdded) {
|
||||
metaDataBuilder.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME));
|
||||
}
|
||||
boolean isWatchesTemplateAdded = randomBoolean();
|
||||
if (isWatchesTemplateAdded) {
|
||||
// ensure not all templates are added, otherwise life cycle service would start
|
||||
if ((isHistoryTemplateAdded || isTriggeredTemplateAdded) == false) {
|
||||
metaDataBuilder.put(IndexTemplateMetaData.builder(WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME));
|
||||
}
|
||||
}
|
||||
ClusterState state = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).metaData(metaDataBuilder).build();
|
||||
when(watcherService.validate(eq(state))).thenReturn(true);
|
||||
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
|
||||
|
||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
|
||||
verify(watcherService, times(0)).start(any(ClusterState.class));
|
||||
}
|
||||
|
||||
private static DiscoveryNode newNode(String nodeName) {
|
||||
return newNode(nodeName, Version.CURRENT);
|
||||
}
|
||||
|
|
|
@ -13,13 +13,13 @@ import org.elasticsearch.client.AdminClient;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
|
@ -36,6 +36,7 @@ import java.util.List;
|
|||
|
||||
import static org.elasticsearch.mock.orig.Mockito.verify;
|
||||
import static org.elasticsearch.mock.orig.Mockito.when;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.eq;
|
||||
|
@ -51,8 +52,6 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
|
|||
|
||||
@Before
|
||||
public void createRegistryAndClient() {
|
||||
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, Collections.emptySet());
|
||||
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||
when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService());
|
||||
|
@ -108,6 +107,25 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
|
|||
verify(client, times(4)).execute(anyObject(), argumentCaptor.capture(), anyObject());
|
||||
}
|
||||
|
||||
public void testThatTemplatesExist() {
|
||||
assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history")), is(false));
|
||||
assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history", ".triggered_watches", ".watches")),
|
||||
is(false));
|
||||
assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME,
|
||||
".triggered_watches", ".watches")), is(true));
|
||||
assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME,
|
||||
".triggered_watches", ".watches", "whatever", "else")), is(true));
|
||||
}
|
||||
|
||||
private ClusterState createClusterState(String ... existingTemplates) {
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
for (String templateName : existingTemplates) {
|
||||
metaDataBuilder.put(IndexTemplateMetaData.builder(templateName));
|
||||
}
|
||||
|
||||
return ClusterState.builder(new ClusterName("foo")).metaData(metaDataBuilder.build()).build();
|
||||
}
|
||||
|
||||
private static class TestPutIndexTemplateResponse extends PutIndexTemplateResponse {
|
||||
TestPutIndexTemplateResponse(boolean acknowledged) {
|
||||
super(acknowledged);
|
||||
|
|
Loading…
Reference in New Issue