From 813609b47c5b9916be0acb5ff56a89a504f5185b Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Fri, 15 May 2020 16:29:04 -0500 Subject: [PATCH] Ensure that .watcher-history-11* template is in installed prior to use (#56734) WatcherIndexTemplateRegistry as of https://github.com/elastic/elasticsearch/pull/52962 requires all nodes to be on 7.7.0 before it allows the version 11 index template to be installed. While in a mixed cluster, nothing prevents Watcher from running on the new host before the all of the nodes are on 7.7.0. This will result in the .watcher-history-11* index without the proper mappings. Without the proper mapping a single document (for a large watch) can exceed the default 1000 field limit and cause error to show in the logs. This commit ensures the same logic for writing to the index is applied as for installing the template. In a mixed cluster, the `10` index template will continue to be written. Only once all of nodes are on 7.7.0+ will the `11` index template be installed and used. closes #56732 --- .../core/template/IndexTemplateRegistry.java | 2 +- .../watcher/history/HistoryStoreField.java | 12 +++++- .../WatcherIndexTemplateRegistryField.java | 3 +- .../authz/store/ReservedRolesStoreTests.java | 4 +- .../elasticsearch/xpack/watcher/Watcher.java | 18 ++++----- .../watcher/execution/ExecutionService.java | 2 +- .../xpack/watcher/history/HistoryStore.java | 10 +++-- .../watcher/history/HistoryStoreTests.java | 40 ++++++++++++++----- .../AbstractWatcherIntegrationTestCase.java | 2 +- .../test/integration/BootStrapTests.java | 4 +- 10 files changed, 63 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java index c0fd0c7c339..33ad3aca2e8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/template/IndexTemplateRegistry.java @@ -154,7 +154,7 @@ public abstract class IndexTemplateRegistry implements ClusterStateListener { if (creationCheck.compareAndSet(false, true)) { IndexTemplateMetadata currentTemplate = state.metadata().getTemplates().get(templateName); if (Objects.isNull(currentTemplate)) { - logger.debug("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin()); + logger.info("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin()); putTemplate(newTemplate, creationCheck); } else if (Objects.isNull(currentTemplate.getVersion()) || newTemplate.getVersion() > currentTemplate.getVersion()) { // IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java index 99520337475..e682ab37108 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/history/HistoryStoreField.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.core.watcher.history; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; @@ -14,12 +16,18 @@ public final class HistoryStoreField { public static final String INDEX_PREFIX = ".watcher-history-"; public static final String INDEX_PREFIX_WITH_TEMPLATE = INDEX_PREFIX + WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION + "-"; + public static final String INDEX_PREFIX_WITH_TEMPLATE_10 = INDEX_PREFIX + + WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10 + "-"; private static final DateFormatter indexTimeFormat = DateFormatter.forPattern("yyyy.MM.dd"); /** * Calculates the correct history index name for a given time */ - public static String getHistoryIndexNameForTime(ZonedDateTime time) { - return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time); + public static String getHistoryIndexNameForTime(ZonedDateTime time, ClusterState state) { + if (state == null || state.nodes().getMinNodeVersion().onOrAfter(Version.V_7_7_0)) { + return INDEX_PREFIX_WITH_TEMPLATE + indexTimeFormat.format(time); + } else { + return INDEX_PREFIX_WITH_TEMPLATE_10 + indexTimeFormat.format(time); + } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java index 4a2524beecf..94fe152dc94 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/support/WatcherIndexTemplateRegistryField.java @@ -18,8 +18,9 @@ public final class WatcherIndexTemplateRegistryField { // version 11: watch history indices are hidden // Note: if you change this, also inform the kibana team around the watcher-ui public static final int INDEX_TEMPLATE_VERSION = 11; + public static final int INDEX_TEMPLATE_VERSION_10 = 10; public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION; - public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-10"; + public static final String HISTORY_TEMPLATE_NAME_10 = ".watch-history-" + INDEX_TEMPLATE_VERSION_10; public static final String HISTORY_TEMPLATE_NAME_NO_ILM = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION; public static final String HISTORY_TEMPLATE_NAME_NO_ILM_10 = ".watch-history-no-ilm-10"; public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index 803bc234b5a..2624787bf1f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -1395,7 +1395,7 @@ public class ReservedRolesStoreTests extends ESTestCase { assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test("foo"), is(false)); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now); + String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null); for (String index : new String[]{ Watch.INDEX, historyIndex, TriggeredWatchStoreField.INDEX_NAME }) { assertOnlyReadAllowed(role, index); } @@ -1429,7 +1429,7 @@ public class ReservedRolesStoreTests extends ESTestCase { assertThat(role.indices().allowedIndicesMatcher(IndexAction.NAME).test(TriggeredWatchStoreField.INDEX_NAME), is(false)); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now); + String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(now, null); for (String index : new String[]{ Watch.INDEX, historyIndex }) { assertOnlyReadAllowed(role, index); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 8378ce20453..bf136d90707 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -386,7 +386,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin, .setConcurrentRequests(SETTING_BULK_CONCURRENT_REQUESTS.get(settings)) .build(); - HistoryStore historyStore = new HistoryStore(bulkProcessor); + HistoryStore historyStore = new HistoryStore(bulkProcessor, clusterService::state); // schedulers final Set scheduleParsers = new HashSet<>(); @@ -623,14 +623,14 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin, indices.add(".watches"); indices.add(".triggered_watches"); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now)); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5))); - indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6))); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now, null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusDays(1), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(1), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(2), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(3), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(4), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(5), null)); + indices.add(HistoryStoreField.getHistoryIndexNameForTime(now.plusMonths(6), null)); for (String index : indices) { boolean matched = false; for (String match : matches) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 296c521771f..94a64cdb94c 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -452,7 +452,7 @@ public class ExecutionService { * Any existing watchRecord will be overwritten. */ private void forcePutHistory(WatchRecord watchRecord) { - String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterService.state()); try { try (XContentBuilder builder = XContentFactory.jsonBuilder(); ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java index 5147fb92154..ed43fd41719 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryStore.java @@ -31,9 +31,11 @@ public class HistoryStore { private static final Logger logger = LogManager.getLogger(HistoryStore.class); private final BulkProcessor bulkProcessor; + private final Supplier clusterStateSupplier; - public HistoryStore(BulkProcessor bulkProcessor) { + public HistoryStore(BulkProcessor bulkProcessor, Supplier clusterStateSupplier) { this.bulkProcessor = bulkProcessor; + this.clusterStateSupplier = clusterStateSupplier; } /** @@ -41,7 +43,7 @@ public class HistoryStore { * If the specified watchRecord already was stored this call will fail with a version conflict. */ public void put(WatchRecord watchRecord) throws Exception { - String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); @@ -58,7 +60,7 @@ public class HistoryStore { * Any existing watchRecord will be overwritten. */ public void forcePut(WatchRecord watchRecord) { - String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime(), clusterStateSupplier.get()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); @@ -78,7 +80,7 @@ public class HistoryStore { * @return true, if history store is ready to be started */ public static boolean validate(ClusterState state) { - String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC)); + String currentIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), state); IndexMetadata indexMetadata = WatchStoreUtils.getConcreteIndex(currentIndex, state.metadata()); return indexMetadata == null || (indexMetadata.getState() == IndexMetadata.State.OPEN && state.routingTable().index(indexMetadata.getIndex()).allPrimaryShardsActive()); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 59aef01f180..bf25d887223 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.watcher.history; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -16,6 +17,8 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -43,11 +46,13 @@ import org.mockito.ArgumentCaptor; import java.time.Instant; import java.time.ZoneOffset; import java.time.ZonedDateTime; +import java.util.Arrays; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.elasticsearch.xpack.core.watcher.history.HistoryStoreField.getHistoryIndexNameForTime; import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION; +import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION_10; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; @@ -63,24 +68,30 @@ public class HistoryStoreTests extends ESTestCase { private HistoryStore historyStore; private Client client; + private ClusterState clusterState; + private DiscoveryNodes discoveryNodes; @Before public void init() { Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build(); client = mock(Client.class); + clusterState = mock(ClusterState.class); + discoveryNodes = mock(DiscoveryNodes.class); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); + when(clusterState.nodes()).thenReturn(discoveryNodes); + when(discoveryNodes.getMinNodeVersion()).thenReturn(randomFrom(Arrays.asList(Version.V_7_0_0, Version.V_7_7_0))); BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); - historyStore = new HistoryStore(bulkProcessor); + historyStore = new HistoryStore(bulkProcessor, () -> clusterState); } public void testPut() throws Exception { ZonedDateTime now = Instant.ofEpochMilli(0).atZone(ZoneOffset.UTC); Wid wid = new Wid("_name", now); - String index = getHistoryIndexNameForTime(now); + String index = getHistoryIndexNameForTime(now, clusterState); ScheduleTriggerEvent event = new ScheduleTriggerEvent(wid.watchId(), now, now); WatchRecord watchRecord = new WatchRecord.MessageWatchRecord(wid, event, ExecutionState.EXECUTED, null, randomAlphaOfLength(10)); @@ -105,15 +116,11 @@ public class HistoryStoreTests extends ESTestCase { } public void testIndexNameGeneration() { - String indexTemplateVersion = Integer.toString(INDEX_TEMPLATE_VERSION); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-"+ indexTemplateVersion +"-1970.01.01")); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03")); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21")); - assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC)), - equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12")); + when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_7_0); + assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION)); + + when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.V_7_0_0); + assertHistoryIndexName(Integer.toString(INDEX_TEMPLATE_VERSION_10)); } public void testStoreWithHideSecrets() throws Exception { @@ -179,4 +186,15 @@ public class HistoryStoreTests extends ESTestCase { assertThat(indexedJson, containsString(username)); assertThat(indexedJson, not(containsString(password))); } + + private void assertHistoryIndexName(String indexTemplateVersion){ + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-1970.01.01")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-1973.03.03")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-2014.11.21")); + assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC), clusterState), + equalTo(".watcher-history-" + indexTemplateVersion + "-2059.10.12")); + } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index dff1f9d3957..322de281059 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -276,7 +276,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase assertAcked(client().admin().indices().prepareCreate(triggeredWatchIndexName)); } - String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC)); + String historyIndex = HistoryStoreField.getHistoryIndexNameForTime(ZonedDateTime.now(ZoneOffset.UTC), null); assertAcked(client().admin().indices().prepareCreate(historyIndex)); logger.info("creating watch history index [{}]", historyIndex); ensureGreen(historyIndex, watchIndexName, triggeredWatchIndexName); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index 56488b12cb3..b33e26f0c61 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -78,7 +78,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { Wid wid = new Wid("_id", now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ExecutableCondition condition = InternalAlwaysCondition.INSTANCE; - String index = HistoryStoreField.getHistoryIndexNameForTime(now); + String index = HistoryStoreField.getHistoryIndexNameForTime(now, null); client().prepareIndex().setIndex(index).setId(wid.value()) .setSource(jsonBuilder().startObject() .startObject(WatchRecord.TRIGGER_EVENT.getPreferredName()) @@ -309,7 +309,7 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { } LocalDateTime localDateTime = LocalDateTime.of(2015, 11, 5, 0, 0, 0, 0); ZonedDateTime triggeredTime = ZonedDateTime.of(localDateTime,ZoneOffset.UTC); - final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime); + final String watchRecordIndex = HistoryStoreField.getHistoryIndexNameForTime(triggeredTime, null); logger.info("Stopping watcher"); stopWatcher();