diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java index a943e7b7a73..99ce19e3ad1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutChecker.java @@ -84,7 +84,6 @@ public class TimeoutChecker implements Closeable { * @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time. */ public void check(String where) { - if (timeoutExceeded) { throw new ElasticsearchTimeoutException("Aborting " + operation + " during [" + where + "] as it has taken longer than the timeout of [" + timeout + "]"); @@ -101,7 +100,6 @@ public class TimeoutChecker implements Closeable { * @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time. */ public Map grokCaptures(Grok grok, String text, String where) { - try { return grok.captures(text); } finally { @@ -137,12 +135,15 @@ public class TimeoutChecker implements Closeable { } @Override - public void register(Matcher matcher) { + public synchronized void register(Matcher matcher) { WatchDogEntry value = registry.get(Thread.currentThread()); if (value != null) { boolean wasFalse = value.registered.compareAndSet(false, true); assert wasFalse; value.matchers.add(matcher); + if (value.isTimedOut()) { + matcher.interrupt(); + } } } @@ -167,8 +168,9 @@ public class TimeoutChecker implements Closeable { assert previousValue != null; } - void interruptLongRunningThreadIfRegistered(Thread thread) { + synchronized void interruptLongRunningThreadIfRegistered(Thread thread) { WatchDogEntry value = registry.get(thread); + value.timedOut(); if (value.registered.get()) { for (Matcher matcher : value.matchers) { matcher.interrupt(); @@ -181,12 +183,21 @@ public class TimeoutChecker implements Closeable { final TimeValue timeout; final AtomicBoolean registered; final Collection matchers; + boolean timedOut; WatchDogEntry(TimeValue timeout) { this.timeout = timeout; this.registered = new AtomicBoolean(false); this.matchers = new CopyOnWriteArrayList<>(); } + + private void timedOut() { + timedOut = true; + } + + private boolean isTimedOut() { + return timedOut; + } } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java index 34c9a047a99..2666bcc44f5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/TimeoutCheckerTests.java @@ -59,21 +59,19 @@ public class TimeoutCheckerTests extends FileStructureTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48861") public void testWatchdog() throws Exception { - TimeValue timeout = TimeValue.timeValueMillis(500); + final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(10, 500)); try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) { - TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog; - + final TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog; Matcher matcher = mock(Matcher.class); - TimeoutChecker.watchdog.register(matcher); + watchdog.register(matcher); assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1)); try { assertBusy(() -> { verify(matcher).interrupt(); }); } finally { - TimeoutChecker.watchdog.unregister(matcher); + watchdog.unregister(matcher); assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0)); } }