Ensure that .watcher-history-11* template is in installed prior to use (#56734)
WatcherIndexTemplateRegistry as of https://github.com/elastic/elasticsearch/pull/52962 requires all nodes to be on 7.7.0 before it allows the version 11 index template to be installed. While in a mixed cluster, nothing prevents Watcher from running on the new host before the all of the nodes are on 7.7.0. This will result in the .watcher-history-11* index without the proper mappings. Without the proper mapping a single document (for a large watch) can exceed the default 1000 field limit and cause error to show in the logs. This commit ensures the same logic for writing to the index is applied as for installing the template. In a mixed cluster, the `10` index template will continue to be written. Only once all of nodes are on 7.7.0+ will the `11` index template be installed and used. closes #56732
This commit is contained in:
parent
e492c23944
commit
813609b47c
|
@ -154,7 +154,7 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener {
|
|||
if (creationCheck.compareAndSet(false, true)) {
|
||||
IndexTemplateMetadata currentTemplate = state.metadata().getTemplates().get(templateName);
|
||||
if (Objects.isNull(currentTemplate)) {
|
||||
logger.debug("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
|
||||
logger.info("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
|
||||
putTemplate(newTemplate, creationCheck);
|
||||
} else if (Objects.isNull(currentTemplate.getVersion()) || newTemplate.getVersion() > currentTemplate.getVersion()) {
|
||||
// IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can
|
||||
|
|
|
@ -5,6 +5,8 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.watcher.history;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.common.time.DateFormatter;
|
||||
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;
|
||||
|
||||
|
@ -14,12 +16,18 @@ public final class HistoryStoreField {
|
|||
|
||||
public static final String INDEX_PREFIX = ".watcher-history-";
|
||||
public static final String INDEX_PREFIX_WITH_TEMPLATE = INDEX_PREFIX + WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION + "-";
|
||||
public static final String INDEX_PREFIX_WITH_TEMPLATE_10 = INDEX_PREFIX +
|
||||
WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10 + "-";
|
||||
private static final DateFormatter indexTimeFormat = DateFormatter.forPattern("yyyy.MM.dd");
|
||||
|
||||
/**
|
||||
* Calculates the correct history index name for a given time
|
||||
*/
|
||||
public static String getHistoryIndexNameForTime(ZonedDateTime time) {
|
||||
return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time);
|
||||
public static String getHistoryIndexNameForTime(ZonedDateTime time, ClusterState state) {
|
||||
if (state == null || state.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0)) {
|
||||
return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time);
|
||||
} else {
|
||||
return INDEX_PREFIX_WITH_TEMPLATE_10 + indexTimeFormat.format(time);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,9 @@ public final class WatcherIndexTemplateRegistryField {
|
|||
// version 11: watch history indices are hidden
|
||||
// Note: if you change this, also inform the kibana team around the watcher-ui
|
||||
public static final int INDEX_TEMPLATE_VERSION = 11;
|
||||
public static final int INDEX_TEMPLATE_VERSION_10 = 10;
|
||||
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
|
||||
public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-10";
|
||||
public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-" + INDEX_TEMPLATE_VERSION_10;
|
||||
public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION;
|
||||
public static final String HISTORY_TEMPLATE_NAME_NO_ILM_10 = ".watch-history-no-ilm-10";
|
||||
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
|
||||
|
|
|
@ -1395,7 +1395,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
|
|||
assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test("foo"), is(false));
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
|
||||
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now);
|
||||
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null);
|
||||
for (String index : new String[]{ Watch.INDEX, historyIndex, TriggeredWatchStoreField.INDEX_NAME }) {
|
||||
assertOnlyReadAllowed(role, index);
|
||||
}
|
||||
|
@ -1429,7 +1429,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
|
|||
assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test(TriggeredWatchStoreField.INDEX_NAME), is(false));
|
||||
|
||||
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
|
||||
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now);
|
||||
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null);
|
||||
for (String index : new String[]{ Watch.INDEX, historyIndex }) {
|
||||
assertOnlyReadAllowed(role, index);
|
||||
}
|
||||
|
|
|
@ -386,7 +386,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
|
|||
.setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings))
|
||||
.build();
|
||||
|
||||
HistoryStore historyStore = new HistoryStore(bulkProcessor);
|
||||
HistoryStore historyStore = new HistoryStore(bulkProcessor, clusterService::state);
|
||||
|
||||
// schedulers
|
||||
final Set<Schedule.Parser> scheduleParsers = new HashSet<>();
|
||||
|
@ -623,14 +623,14 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
|
|||
indices.add(".watches");
|
||||
indices.add(".triggered_watches");
|
||||
ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1)));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1)));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2)));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3)));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4)));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5)));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6)));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now, null));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1), null));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1), null));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2), null));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3), null));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4), null));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5), null));
|
||||
indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6), null));
|
||||
for (String index : indices) {
|
||||
boolean matched = false;
|
||||
for (String match : matches) {
|
||||
|
|
|
@ -452,7 +452,7 @@ public class ExecutionService {
|
|||
* Any existing watchRecord will be overwritten.
|
||||
*/
|
||||
private void forcePutHistory(WatchRecord watchRecord) {
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterService.state());
|
||||
try {
|
||||
try (XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) {
|
||||
|
|
|
@ -31,9 +31,11 @@ public class HistoryStore {
|
|||
private static final Logger logger = LogManager.getLogger(HistoryStore.class);
|
||||
|
||||
private final BulkProcessor bulkProcessor;
|
||||
private final Supplier<ClusterState> clusterStateSupplier;
|
||||
|
||||
public HistoryStore(BulkProcessor bulkProcessor) {
|
||||
public HistoryStore(BulkProcessor bulkProcessor, Supplier<ClusterState> clusterStateSupplier) {
|
||||
this.bulkProcessor = bulkProcessor;
|
||||
this.clusterStateSupplier = clusterStateSupplier;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -41,7 +43,7 @@ public class HistoryStore {
|
|||
* If the specified watchRecord already was stored this call will fail with a version conflict.
|
||||
*/
|
||||
public void put(WatchRecord watchRecord) throws Exception {
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get());
|
||||
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
||||
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
|
||||
|
||||
|
@ -58,7 +60,7 @@ public class HistoryStore {
|
|||
* Any existing watchRecord will be overwritten.
|
||||
*/
|
||||
public void forcePut(WatchRecord watchRecord) {
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime());
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get());
|
||||
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
||||
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);
|
||||
|
||||
|
@ -78,7 +80,7 @@ public class HistoryStore {
|
|||
* @return true, if history store is ready to be started
|
||||
*/
|
||||
public static boolean validate(ClusterState state) {
|
||||
String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC));
|
||||
String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), state);
|
||||
IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(currentIndex, state.metadata());
|
||||
return indexMetadata == null || (indexMetadata.getState() == IndexMetadata.State.OPEN &&
|
||||
state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive());
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.watcher.history;
|
|||
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteRequest.OpType;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
|
@ -16,6 +17,8 @@ import org.elasticsearch.action.bulk.BulkResponse;
|
|||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
|
@ -43,11 +46,13 @@ import org.mockito.ArgumentCaptor;
|
|||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.xpack.core.watcher.history.HistoryStoreField.getHistoryIndexNameForTime;
|
||||
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION;
|
||||
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -63,24 +68,30 @@ public class HistoryStoreTests extends ESTestCase {
|
|||
|
||||
private HistoryStore historyStore;
|
||||
private Client client;
|
||||
private ClusterState clusterState;
|
||||
private DiscoveryNodes discoveryNodes;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build();
|
||||
client = mock(Client.class);
|
||||
clusterState = mock(ClusterState.class);
|
||||
discoveryNodes = mock(DiscoveryNodes.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(client.settings()).thenReturn(settings);
|
||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
|
||||
when(clusterState.nodes()).thenReturn(discoveryNodes);
|
||||
when(discoveryNodes.getMinNodeVersion()).thenReturn(randomFrom(Arrays.asList(Version.V_7_0_0, Version.V_7_7_0)));
|
||||
BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class);
|
||||
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build();
|
||||
historyStore = new HistoryStore(bulkProcessor);
|
||||
historyStore = new HistoryStore(bulkProcessor, () -> clusterState);
|
||||
}
|
||||
|
||||
public void testPut() throws Exception {
|
||||
ZonedDateTime now = Instant.ofEpochMilli(0).atZone(ZoneOffset.UTC);
|
||||
Wid wid = new Wid("_name", now);
|
||||
String index = getHistoryIndexNameForTime(now);
|
||||
String index = getHistoryIndexNameForTime(now, clusterState);
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), now, now);
|
||||
WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null, randomAlphaOfLength(10));
|
||||
|
||||
|
@ -105,15 +116,11 @@ public class HistoryStoreTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testIndexNameGeneration() {
|
||||
String indexTemplateVersion = Integer.toString(INDEX_TEMPLATE_VERSION);
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC)),
|
||||
equalTo(".watcher-history-"+ indexTemplateVersion +"-1970.01.01"));
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC)),
|
||||
equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03"));
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC)),
|
||||
equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21"));
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC)),
|
||||
equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12"));
|
||||
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_7_0);
|
||||
assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION));
|
||||
|
||||
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_0_0);
|
||||
assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION_10));
|
||||
}
|
||||
|
||||
public void testStoreWithHideSecrets() throws Exception {
|
||||
|
@ -179,4 +186,15 @@ public class HistoryStoreTests extends ESTestCase {
|
|||
assertThat(indexedJson, containsString(username));
|
||||
assertThat(indexedJson, not(containsString(password)));
|
||||
}
|
||||
|
||||
private void assertHistoryIndexName(String indexTemplateVersion){
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC), clusterState),
|
||||
equalTo(".watcher-history-" + indexTemplateVersion + "-1970.01.01"));
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC), clusterState),
|
||||
equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03"));
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC), clusterState),
|
||||
equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21"));
|
||||
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC), clusterState),
|
||||
equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -276,7 +276,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|||
assertAcked(client().admin().indices().prepareCreate(triggeredWatchIndexName));
|
||||
}
|
||||
|
||||
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC));
|
||||
String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), null);
|
||||
assertAcked(client().admin().indices().prepareCreate(historyIndex));
|
||||
logger.info("creating watch history index [{}]", historyIndex);
|
||||
ensureGreen(historyIndex, watchIndexName, triggeredWatchIndexName);
|
||||
|
|
|
@ -78,7 +78,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
|
|||
Wid wid = new Wid("_id", now);
|
||||
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
|
||||
ExecutableCondition condition = InternalAlwaysCondition.INSTANCE;
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(now);
|
||||
String index = HistoryStoreField.getHistoryIndexNameForTime(now, null);
|
||||
client().prepareIndex().setIndex(index).setId(wid.value())
|
||||
.setSource(jsonBuilder().startObject()
|
||||
.startObject(WatchRecord.TRIGGER_EVENT.getPreferredName())
|
||||
|
@ -309,7 +309,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
|
|||
}
|
||||
LocalDateTime localDateTime = LocalDateTime.of(2015, 11, 5, 0, 0, 0, 0);
|
||||
ZonedDateTime triggeredTime = ZonedDateTime.of(localDateTime,ZoneOffset.UTC);
|
||||
final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime);
|
||||
final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime, null);
|
||||
|
||||
logger.info("Stopping watcher");
|
||||
stopWatcher();
|
||||
|
|
Loading…
Reference in New Issue