INGEST: Fix ThreadWatchDog Throwing on Shutdown (#32578)
* INGEST: Fix ThreadWatchDog Throwing on Shutdown * #32539 is caused by the fact that ThreadWatchDog.Default could throw on shutdown if the ThreadPool is interrupted while `interruptLongRunningExecutions` is in progress. This is a result of the watchdog not having a lifecycle of its own (normally it terminates when the threadpool terminates). * We can't easily use `org.elasticsearch.common.util.concurrent.EsRejectedExecutionException#isExecutorShutdown` to catch this state the same way other components do since thatwould require adding the core lib to Grok as a dependency * Since we have no knowledge of the lifecycle in this compontent since we're only passed the scheduler `BiFunction` I fixed this by only scheduling the watchdog when there's actually registered threads in it. * I think using the patter of locking via two `Atomic*` values should not be much of a performance concern here under load since either the integer will likely be > 0 in this case (because we have multiple Grok in parallel) or the running state will be true because there likely was at least one thread registered when the watchdog ran and so the enqueing of the watchdog task during `register` will happen very rarely here (in the worst case scenario of only a single Grok thread it will happen less frequently than once every `ingest.grok.watchdog.interval`). The atomic update on the count should not be relevant relative to the cost of adding a new node to the CHM either. * Fixes #32539 * Also fixes the watchdog to run if it doens't have to in general.
This commit is contained in:
parent
b2a0f38a0c
commit
4dda5a990b
|
@ -21,6 +21,8 @@ package org.elasticsearch.grok;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
|
@ -104,6 +106,8 @@ public interface ThreadWatchdog {
|
||||||
private final long maxExecutionTime;
|
private final long maxExecutionTime;
|
||||||
private final LongSupplier relativeTimeSupplier;
|
private final LongSupplier relativeTimeSupplier;
|
||||||
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
|
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
|
||||||
|
private final AtomicInteger registered = new AtomicInteger(0);
|
||||||
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
|
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private Default(long interval,
|
private Default(long interval,
|
||||||
|
@ -114,11 +118,14 @@ public interface ThreadWatchdog {
|
||||||
this.maxExecutionTime = maxExecutionTime;
|
this.maxExecutionTime = maxExecutionTime;
|
||||||
this.relativeTimeSupplier = relativeTimeSupplier;
|
this.relativeTimeSupplier = relativeTimeSupplier;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
scheduler.apply(interval, this::interruptLongRunningExecutions);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void register() {
|
public void register() {
|
||||||
|
registered.getAndIncrement();
|
||||||
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
|
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
|
||||||
|
if (running.compareAndSet(false, true) == true) {
|
||||||
|
scheduler.apply(interval, this::interruptLongRunningExecutions);
|
||||||
|
}
|
||||||
assert previousValue == null;
|
assert previousValue == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,6 +136,7 @@ public interface ThreadWatchdog {
|
||||||
|
|
||||||
public void unregister() {
|
public void unregister() {
|
||||||
Long previousValue = registry.remove(Thread.currentThread());
|
Long previousValue = registry.remove(Thread.currentThread());
|
||||||
|
registered.decrementAndGet();
|
||||||
assert previousValue != null;
|
assert previousValue != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,7 +148,11 @@ public interface ThreadWatchdog {
|
||||||
// not removing the entry here, this happens in the unregister() method.
|
// not removing the entry here, this happens in the unregister() method.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (registered.get() > 0) {
|
||||||
scheduler.apply(interval, this::interruptLongRunningExecutions);
|
scheduler.apply(interval, this::interruptLongRunningExecutions);
|
||||||
|
} else {
|
||||||
|
running.set(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,22 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.grok;
|
package org.elasticsearch.grok;
|
||||||
|
|
||||||
import org.elasticsearch.test.ESTestCase;
|
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
|
|
||||||
public class ThreadWatchdogTests extends ESTestCase {
|
public class ThreadWatchdogTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -67,4 +77,38 @@ public class ThreadWatchdogTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIdleIfNothingRegistered() throws Exception {
|
||||||
|
long interval = 1L;
|
||||||
|
ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
|
||||||
|
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis,
|
||||||
|
(delay, command) -> threadPool.schedule(command, delay, TimeUnit.MILLISECONDS));
|
||||||
|
// Periodic action is not scheduled because no thread is registered
|
||||||
|
verifyZeroInteractions(threadPool);
|
||||||
|
CompletableFuture<Runnable> commandFuture = new CompletableFuture<>();
|
||||||
|
// Periodic action is scheduled because a thread is registered
|
||||||
|
doAnswer(invocationOnMock -> {
|
||||||
|
commandFuture.complete((Runnable) invocationOnMock.getArguments()[0]);
|
||||||
|
return null;
|
||||||
|
}).when(threadPool).schedule(
|
||||||
|
any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS)
|
||||||
|
);
|
||||||
|
watchdog.register();
|
||||||
|
// Registering the first thread should have caused the command to get scheduled again
|
||||||
|
Runnable command = commandFuture.get(1L, TimeUnit.MILLISECONDS);
|
||||||
|
Mockito.reset(threadPool);
|
||||||
|
watchdog.unregister();
|
||||||
|
command.run();
|
||||||
|
// Periodic action is not scheduled again because no thread is registered
|
||||||
|
verifyZeroInteractions(threadPool);
|
||||||
|
watchdog.register();
|
||||||
|
Thread otherThread = new Thread(watchdog::register);
|
||||||
|
try {
|
||||||
|
verify(threadPool).schedule(any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS));
|
||||||
|
// Registering a second thread does not cause the command to get scheduled twice
|
||||||
|
verifyNoMoreInteractions(threadPool);
|
||||||
|
otherThread.start();
|
||||||
|
} finally {
|
||||||
|
otherThread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue