[ML] fixes testWatchdog test verifying matcher is interrupted on timeout (#62391) (#62447)

Constructing the timout checker FIRST and THEN registering the watcher allows the test to have a race condition.

The timeout value could be reached BEFORE the matcher is added. To prevent the matcher never being interrupted, a new timedOut value is added to the watcher thread entry. Then when a new matcher is registered, if the thread was previously timedout, we interrupt the matcher immediately.

closes #48861
This commit is contained in:
Benjamin Trent 2020-09-16 09:13:22 -04:00 committed by GitHub
parent a68e47ab1f
commit 341eeae6e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 10 deletions

View File

@ -84,7 +84,6 @@ public class TimeoutChecker implements Closeable {
* @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time. * @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time.
*/ */
public void check(String where) { public void check(String where) {
if (timeoutExceeded) { if (timeoutExceeded) {
throw new ElasticsearchTimeoutException("Aborting " + operation + " during [" + where + throw new ElasticsearchTimeoutException("Aborting " + operation + " during [" + where +
"] as it has taken longer than the timeout of [" + timeout + "]"); "] as it has taken longer than the timeout of [" + timeout + "]");
@ -101,7 +100,6 @@ public class TimeoutChecker implements Closeable {
* @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time. * @throws ElasticsearchTimeoutException If the operation is found to have taken longer than the permitted time.
*/ */
public Map<String, Object> grokCaptures(Grok grok, String text, String where) { public Map<String, Object> grokCaptures(Grok grok, String text, String where) {
try { try {
return grok.captures(text); return grok.captures(text);
} finally { } finally {
@ -137,12 +135,15 @@ public class TimeoutChecker implements Closeable {
} }
@Override @Override
public void register(Matcher matcher) { public synchronized void register(Matcher matcher) {
WatchDogEntry value = registry.get(Thread.currentThread()); WatchDogEntry value = registry.get(Thread.currentThread());
if (value != null) { if (value != null) {
boolean wasFalse = value.registered.compareAndSet(false, true); boolean wasFalse = value.registered.compareAndSet(false, true);
assert wasFalse; assert wasFalse;
value.matchers.add(matcher); value.matchers.add(matcher);
if (value.isTimedOut()) {
matcher.interrupt();
}
} }
} }
@ -167,8 +168,9 @@ public class TimeoutChecker implements Closeable {
assert previousValue != null; assert previousValue != null;
} }
void interruptLongRunningThreadIfRegistered(Thread thread) { synchronized void interruptLongRunningThreadIfRegistered(Thread thread) {
WatchDogEntry value = registry.get(thread); WatchDogEntry value = registry.get(thread);
value.timedOut();
if (value.registered.get()) { if (value.registered.get()) {
for (Matcher matcher : value.matchers) { for (Matcher matcher : value.matchers) {
matcher.interrupt(); matcher.interrupt();
@ -181,12 +183,21 @@ public class TimeoutChecker implements Closeable {
final TimeValue timeout; final TimeValue timeout;
final AtomicBoolean registered; final AtomicBoolean registered;
final Collection<Matcher> matchers; final Collection<Matcher> matchers;
boolean timedOut;
WatchDogEntry(TimeValue timeout) { WatchDogEntry(TimeValue timeout) {
this.timeout = timeout; this.timeout = timeout;
this.registered = new AtomicBoolean(false); this.registered = new AtomicBoolean(false);
this.matchers = new CopyOnWriteArrayList<>(); this.matchers = new CopyOnWriteArrayList<>();
} }
private void timedOut() {
timedOut = true;
}
private boolean isTimedOut() {
return timedOut;
}
} }
} }
} }

View File

@ -59,21 +59,19 @@ public class TimeoutCheckerTests extends FileStructureTestCase {
} }
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48861")
public void testWatchdog() throws Exception { public void testWatchdog() throws Exception {
TimeValue timeout = TimeValue.timeValueMillis(500); final TimeValue timeout = TimeValue.timeValueMillis(randomIntBetween(10, 500));
try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) { try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) {
TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog; final TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog;
Matcher matcher = mock(Matcher.class); Matcher matcher = mock(Matcher.class);
TimeoutChecker.watchdog.register(matcher); watchdog.register(matcher);
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1)); assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1));
try { try {
assertBusy(() -> { assertBusy(() -> {
verify(matcher).interrupt(); verify(matcher).interrupt();
}); });
} finally { } finally {
TimeoutChecker.watchdog.unregister(matcher); watchdog.unregister(matcher);
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0)); assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0));
} }
} }