Watcher: Create templates on nodes newer than the master (elastic/x-pack-elasticsearch#2950)
This problem was introduced due to distributed watch execution. When a node newer than the master node joins the cluster and gets a .watches shard assigned it is supposed to start watcher. However when a new version of the watch history template is part of that new node (and we might increase that version anytime), this template does not get installed, because only the master node is updating watcher templates. This commit checks if the local node version is higher than the master node version and then also puts missing templates. Currently this is done for all watcher templates, not only the watcher history. relates elastic/x-pack-elasticsearch#2944 Original commit: elastic/x-pack-elasticsearch@4960231ea7
This commit is contained in:
parent
1933fc71f3
commit
3d5fb54522
|
@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRespo
|
|||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -77,13 +78,15 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
|
|||
return;
|
||||
}
|
||||
|
||||
if (event.localNodeMaster() == false) {
|
||||
// Only the node that runs or will run Watcher should update the templates. Otherwise unnecessary put template
|
||||
// calls would happen
|
||||
return;
|
||||
}
|
||||
// if this node is newer than the master node, we probably need to add the history template, which might be newer than the
|
||||
// history template the master node has, so we need potentially add new templates despite being not the master node
|
||||
DiscoveryNode localNode = event.state().getNodes().getLocalNode();
|
||||
DiscoveryNode masterNode = event.state().getNodes().getMasterNode();
|
||||
boolean localNodeVersionAfterMaster = localNode.getVersion().after(masterNode.getVersion());
|
||||
|
||||
addTemplatesIfMissing(state);
|
||||
if (event.localNodeMaster() || localNodeVersionAfterMaster) {
|
||||
addTemplatesIfMissing(state);
|
||||
}
|
||||
}
|
||||
|
||||
private void addTemplatesIfMissing(ClusterState state) {
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.watcher.support;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
|
@ -18,6 +19,8 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -44,6 +47,7 @@ import static org.mockito.Matchers.same;
|
|||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
public class WatcherIndexTemplateRegistryTests extends ESTestCase {
|
||||
|
||||
|
@ -73,36 +77,18 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
|
|||
registry = new WatcherIndexTemplateRegistry(Settings.EMPTY, clusterService, threadPool, internalClient);
|
||||
}
|
||||
|
||||
private ClusterChangedEvent createClusterChangedEvent(List<String> existingTemplateNames) {
|
||||
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
|
||||
when(event.localNodeMaster()).thenReturn(true);
|
||||
ClusterState cs = mock(ClusterState.class);
|
||||
ClusterBlocks clusterBlocks = mock(ClusterBlocks.class);
|
||||
when(clusterBlocks.hasGlobalBlock(eq(GatewayService.STATE_NOT_RECOVERED_BLOCK))).thenReturn(false);
|
||||
when(cs.blocks()).thenReturn(clusterBlocks);
|
||||
when(event.state()).thenReturn(cs);
|
||||
|
||||
MetaData metaData = mock(MetaData.class);
|
||||
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> indexTemplates = ImmutableOpenMap.builder();
|
||||
for (String name : existingTemplateNames) {
|
||||
indexTemplates.put(name, mock(IndexTemplateMetaData.class));
|
||||
}
|
||||
|
||||
when(metaData.getTemplates()).thenReturn(indexTemplates.build());
|
||||
when(cs.metaData()).thenReturn(metaData);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
public void testThatNonExistingTemplatesAreAddedImmediately() {
|
||||
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList());
|
||||
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
|
||||
|
||||
ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes);
|
||||
registry.clusterChanged(event);
|
||||
ArgumentCaptor<PutIndexTemplateRequest> argumentCaptor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
|
||||
verify(client, times(3)).execute(anyObject(), argumentCaptor.capture(), anyObject());
|
||||
|
||||
// now delete one template from the cluster state and lets retry
|
||||
ClusterChangedEvent newEvent = createClusterChangedEvent(Arrays.asList(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME,
|
||||
WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME));
|
||||
WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME), nodes);
|
||||
registry.clusterChanged(newEvent);
|
||||
verify(client, times(4)).execute(anyObject(), argumentCaptor.capture(), anyObject());
|
||||
}
|
||||
|
@ -117,6 +103,57 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
|
|||
".triggered_watches", ".watches", "whatever", "else")), is(true));
|
||||
}
|
||||
|
||||
// if a node is newer than the master node, the template needs to be applied as well
|
||||
// otherwise a rolling upgrade would not work as expected, when the node has a .watches shard on it
|
||||
public void testThatTemplatesAreAppliedOnNewerNodes() {
|
||||
DiscoveryNode localNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNode masterNode = new DiscoveryNode("master", ESTestCase.buildNewFakeTransportAddress(), Version.V_6_0_0);
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("master").add(localNode).add(masterNode).build();
|
||||
|
||||
ClusterChangedEvent event = createClusterChangedEvent(Arrays.asList(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME,
|
||||
WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME, ".watch-history-6"), nodes);
|
||||
registry.clusterChanged(event);
|
||||
|
||||
ArgumentCaptor<PutIndexTemplateRequest> argumentCaptor = ArgumentCaptor.forClass(PutIndexTemplateRequest.class);
|
||||
verify(client, times(1)).execute(anyObject(), argumentCaptor.capture(), anyObject());
|
||||
assertThat(argumentCaptor.getValue().name(), is(WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME));
|
||||
}
|
||||
|
||||
public void testThatTemplatesAreNotAppliedOnSameVersionNodes() {
|
||||
DiscoveryNode localNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNode masterNode = new DiscoveryNode("master", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
|
||||
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("master").add(localNode).add(masterNode).build();
|
||||
|
||||
ClusterChangedEvent event = createClusterChangedEvent(Arrays.asList(WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME,
|
||||
WatcherIndexTemplateRegistry.WATCHES_TEMPLATE_NAME, ".watch-history-6"), nodes);
|
||||
registry.clusterChanged(event);
|
||||
|
||||
verifyZeroInteractions(client);
|
||||
}
|
||||
|
||||
private ClusterChangedEvent createClusterChangedEvent(List<String> existingTemplateNames, DiscoveryNodes nodes) {
|
||||
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
|
||||
when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster());
|
||||
ClusterState cs = mock(ClusterState.class);
|
||||
ClusterBlocks clusterBlocks = mock(ClusterBlocks.class);
|
||||
when(clusterBlocks.hasGlobalBlock(eq(GatewayService.STATE_NOT_RECOVERED_BLOCK))).thenReturn(false);
|
||||
when(cs.blocks()).thenReturn(clusterBlocks);
|
||||
when(event.state()).thenReturn(cs);
|
||||
|
||||
when(cs.getNodes()).thenReturn(nodes);
|
||||
|
||||
MetaData metaData = mock(MetaData.class);
|
||||
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> indexTemplates = ImmutableOpenMap.builder();
|
||||
for (String name : existingTemplateNames) {
|
||||
indexTemplates.put(name, mock(IndexTemplateMetaData.class));
|
||||
}
|
||||
|
||||
when(metaData.getTemplates()).thenReturn(indexTemplates.build());
|
||||
when(cs.metaData()).thenReturn(metaData);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
private ClusterState createClusterState(String ... existingTemplates) {
|
||||
MetaData.Builder metaDataBuilder = MetaData.builder();
|
||||
for (String templateName : existingTemplates) {
|
||||
|
|
Loading…
Reference in New Issue