From 4aa50ed34842ab2522c66e4fbf9d7daa9cfeb59c Mon Sep 17 00:00:00 2001 From: Tim Vernum Date: Wed, 20 Feb 2019 19:18:00 +1100 Subject: [PATCH] Resolve concurrency with watcher trigger service (#39164) The watcher trigger service could attempt to modify the perWatchStats map simultaneously from multiple threads. This would cause the internal state to become inconsistent, in particular the count() method may return an incorrect value for the number of watches. This changes replaces the implementation of the map with a ConcurrentHashMap so that its internal state remains consistent even when accessed from mutiple threads. Backport of: #39092 --- .../elasticsearch/xpack/watcher/trigger/TriggerService.java | 3 ++- .../org/elasticsearch/xpack/watcher/WatcherServiceTests.java | 2 ++ .../xpack/watcher/test/integration/BootStrapTests.java | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java index 4c27ec329bb..837acf097be 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; @@ -29,7 +30,7 @@ public class TriggerService { private final GroupedConsumer consumer = new GroupedConsumer(); private final Map engines; - private final Map perWatchStats = new HashMap<>(); + private final Map perWatchStats = new ConcurrentHashMap<>(); public TriggerService(Set engines) { Map builder = new HashMap<>(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 3b8d844cc12..3d1fe78e27a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -231,8 +231,10 @@ public class WatcherServiceTests extends ESTestCase { Trigger trigger = mock(Trigger.class); when(trigger.type()).thenReturn(engineType); + final String id = randomAlphaOfLengthBetween(3, 12); Watch watch = mock(Watch.class); when(watch.trigger()).thenReturn(trigger); + when(watch.id()).thenReturn(id); when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE); ExecutableNoneInput noneInput = new ExecutableNoneInput(); when(watch.input()).thenReturn(noneInput); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index 6382909f96f..b15b14a186a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -141,7 +141,6 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { }); } - @AwaitsFix(bugUrl = "Supposedly fixed; https://github.com/elastic/x-pack-elasticsearch/issues/1915") public void testLoadExistingWatchesUponStartup() throws Exception { stopWatcher();