Change grok watch dog to be Matcher based instead of thread based. (#48346)
There is a watchdog in order to avoid long running (and expensive) grok expressions. Currently the watchdog is thread based, threads that run grok expressions are registered and after completion unregister. If these threads stay registered for too long then the watch dog interrupts these threads. Joni (the library that powers grok expressions) has a mechanism that checks whether the current thread is interrupted and if so abort the pattern matching. Newer versions have an additional method to abort long running pattern matching inside joni. Instead of checking the thread's interrupted flag, joni now also checks a volatile field that can be set via a `Matcher` instance. This is more efficient method for aborting long running matches. (joni checks each 30k iterations whether interrupted flag is set vs. just checking a volatile field) Recently we upgraded to a recent joni version (#47374), and this PR is a followup of that PR. This change should also fix #43673, since it appears when unit tests are ran the a test runner thread's interrupted flag may already have been set, due to some thread reuse.
This commit is contained in:
parent
5ecfcdb162
commit
b034153df7
|
@ -74,24 +74,24 @@ public final class Grok {
|
||||||
private final Map<String, String> patternBank;
|
private final Map<String, String> patternBank;
|
||||||
private final boolean namedCaptures;
|
private final boolean namedCaptures;
|
||||||
private final Regex compiledExpression;
|
private final Regex compiledExpression;
|
||||||
private final ThreadWatchdog threadWatchdog;
|
private final MatcherWatchdog matcherWatchdog;
|
||||||
|
|
||||||
public Grok(Map<String, String> patternBank, String grokPattern) {
|
public Grok(Map<String, String> patternBank, String grokPattern) {
|
||||||
this(patternBank, grokPattern, true, ThreadWatchdog.noop());
|
this(patternBank, grokPattern, true, MatcherWatchdog.noop());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Grok(Map<String, String> patternBank, String grokPattern, ThreadWatchdog threadWatchdog) {
|
public Grok(Map<String, String> patternBank, String grokPattern, MatcherWatchdog matcherWatchdog) {
|
||||||
this(patternBank, grokPattern, true, threadWatchdog);
|
this(patternBank, grokPattern, true, matcherWatchdog);
|
||||||
}
|
}
|
||||||
|
|
||||||
Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
|
Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
|
||||||
this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop());
|
this(patternBank, grokPattern, namedCaptures, MatcherWatchdog.noop());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) {
|
private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, MatcherWatchdog matcherWatchdog) {
|
||||||
this.patternBank = patternBank;
|
this.patternBank = patternBank;
|
||||||
this.namedCaptures = namedCaptures;
|
this.namedCaptures = namedCaptures;
|
||||||
this.threadWatchdog = threadWatchdog;
|
this.matcherWatchdog = matcherWatchdog;
|
||||||
|
|
||||||
for (Map.Entry<String, String> entry : patternBank.entrySet()) {
|
for (Map.Entry<String, String> entry : patternBank.entrySet()) {
|
||||||
String name = entry.getKey();
|
String name = entry.getKey();
|
||||||
|
@ -172,14 +172,12 @@ public final class Grok {
|
||||||
|
|
||||||
int result;
|
int result;
|
||||||
try {
|
try {
|
||||||
threadWatchdog.register();
|
matcherWatchdog.register(matcher);
|
||||||
result = matcher.searchInterruptible(0, grokPatternBytes.length, Option.NONE);
|
result = matcher.search(0, grokPatternBytes.length, Option.NONE);
|
||||||
} catch (InterruptedException e) {
|
|
||||||
result = Matcher.INTERRUPTED;
|
|
||||||
} finally {
|
} finally {
|
||||||
threadWatchdog.unregister();
|
matcherWatchdog.unregister(matcher);
|
||||||
}
|
}
|
||||||
if (result != -1) {
|
if (result >= 0) {
|
||||||
Region region = matcher.getEagerRegion();
|
Region region = matcher.getEagerRegion();
|
||||||
String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern);
|
String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern);
|
||||||
String subName = groupMatch(SUBNAME_GROUP, region, grokPattern);
|
String subName = groupMatch(SUBNAME_GROUP, region, grokPattern);
|
||||||
|
@ -217,18 +215,16 @@ public final class Grok {
|
||||||
* Checks whether a specific text matches the defined grok expression.
|
* Checks whether a specific text matches the defined grok expression.
|
||||||
*
|
*
|
||||||
* @param text the string to match
|
* @param text the string to match
|
||||||
* @return true if grok expression matches text, false otherwise.
|
* @return true if grok expression matches text or there is a timeout, false otherwise.
|
||||||
*/
|
*/
|
||||||
public boolean match(String text) {
|
public boolean match(String text) {
|
||||||
Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8));
|
Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8));
|
||||||
int result;
|
int result;
|
||||||
try {
|
try {
|
||||||
threadWatchdog.register();
|
matcherWatchdog.register(matcher);
|
||||||
result = matcher.searchInterruptible(0, text.length(), Option.DEFAULT);
|
result = matcher.search(0, text.length(), Option.DEFAULT);
|
||||||
} catch (InterruptedException e) {
|
|
||||||
result = Matcher.INTERRUPTED;
|
|
||||||
} finally {
|
} finally {
|
||||||
threadWatchdog.unregister();
|
matcherWatchdog.unregister(matcher);
|
||||||
}
|
}
|
||||||
return (result != -1);
|
return (result != -1);
|
||||||
}
|
}
|
||||||
|
@ -245,16 +241,14 @@ public final class Grok {
|
||||||
Matcher matcher = compiledExpression.matcher(textAsBytes);
|
Matcher matcher = compiledExpression.matcher(textAsBytes);
|
||||||
int result;
|
int result;
|
||||||
try {
|
try {
|
||||||
threadWatchdog.register();
|
matcherWatchdog.register(matcher);
|
||||||
result = matcher.searchInterruptible(0, textAsBytes.length, Option.DEFAULT);
|
result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
|
||||||
} catch (InterruptedException e) {
|
|
||||||
result = Matcher.INTERRUPTED;
|
|
||||||
} finally {
|
} finally {
|
||||||
threadWatchdog.unregister();
|
matcherWatchdog.unregister(matcher);
|
||||||
}
|
}
|
||||||
if (result == Matcher.INTERRUPTED) {
|
if (result == Matcher.INTERRUPTED) {
|
||||||
throw new RuntimeException("grok pattern matching was interrupted after [" +
|
throw new RuntimeException("grok pattern matching was interrupted after [" +
|
||||||
threadWatchdog.maxExecutionTimeInMillis() + "] ms");
|
matcherWatchdog.maxExecutionTimeInMillis() + "] ms");
|
||||||
} else if (result == Matcher.FAILED) {
|
} else if (result == Matcher.FAILED) {
|
||||||
// TODO: I think we should throw an error here?
|
// TODO: I think we should throw an error here?
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.grok;
|
package org.elasticsearch.grok;
|
||||||
|
|
||||||
|
import org.joni.Matcher;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -27,7 +29,7 @@ import java.util.function.LongSupplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protects against long running operations that happen between the register and unregister invocations.
|
* Protects against long running operations that happen between the register and unregister invocations.
|
||||||
* Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method
|
* Threads that invoke {@link #register(Matcher)}, but take too long to invoke the {@link #unregister(Matcher)} method
|
||||||
* will be interrupted.
|
* will be interrupted.
|
||||||
*
|
*
|
||||||
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
|
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
|
||||||
|
@ -35,28 +37,32 @@ import java.util.function.LongSupplier;
|
||||||
* that for every 30k iterations it checks if the current thread is interrupted and if so
|
* that for every 30k iterations it checks if the current thread is interrupted and if so
|
||||||
* returns {@link org.joni.Matcher#INTERRUPTED}.
|
* returns {@link org.joni.Matcher#INTERRUPTED}.
|
||||||
*/
|
*/
|
||||||
public interface ThreadWatchdog {
|
public interface MatcherWatchdog {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers the current thread and interrupts the current thread
|
* Registers the current matcher and interrupts the this matcher
|
||||||
* if the takes too long for this thread to invoke {@link #unregister()}.
|
* if the takes too long for this thread to invoke {@link #unregister(Matcher)}.
|
||||||
|
*
|
||||||
|
* @param matcher The matcher to register
|
||||||
*/
|
*/
|
||||||
void register();
|
void register(Matcher matcher);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()}
|
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister(Matcher)}
|
||||||
* after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread.
|
* after {@link #register(Matcher)} has been invoked before this ThreadWatchDog starts to interrupting that thread.
|
||||||
*/
|
*/
|
||||||
long maxExecutionTimeInMillis();
|
long maxExecutionTimeInMillis();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregisters the current thread and prevents it from being interrupted.
|
* Unregisters the current matcher and prevents it from being interrupted.
|
||||||
|
*
|
||||||
|
* @param matcher The matcher to unregister
|
||||||
*/
|
*/
|
||||||
void unregister();
|
void unregister(Matcher matcher);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()}
|
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register(Matcher)}
|
||||||
* and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and
|
* and not {@link #unregister(Matcher)} and have been in this state for longer than the specified max execution interval and
|
||||||
* then interrupts these threads.
|
* then interrupts these threads.
|
||||||
*
|
*
|
||||||
* @param interval The fixed interval to check if there are threads to interrupt
|
* @param interval The fixed interval to check if there are threads to interrupt
|
||||||
|
@ -64,51 +70,51 @@ public interface ThreadWatchdog {
|
||||||
* @param relativeTimeSupplier A supplier that returns relative time
|
* @param relativeTimeSupplier A supplier that returns relative time
|
||||||
* @param scheduler A scheduler that is able to execute a command for each fixed interval
|
* @param scheduler A scheduler that is able to execute a command for each fixed interval
|
||||||
*/
|
*/
|
||||||
static ThreadWatchdog newInstance(long interval,
|
static MatcherWatchdog newInstance(long interval,
|
||||||
long maxExecutionTime,
|
long maxExecutionTime,
|
||||||
LongSupplier relativeTimeSupplier,
|
LongSupplier relativeTimeSupplier,
|
||||||
BiConsumer<Long, Runnable> scheduler) {
|
BiConsumer<Long, Runnable> scheduler) {
|
||||||
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
|
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
|
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
|
||||||
*/
|
*/
|
||||||
static ThreadWatchdog noop() {
|
static MatcherWatchdog noop() {
|
||||||
return Noop.INSTANCE;
|
return Noop.INSTANCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
class Noop implements ThreadWatchdog {
|
class Noop implements MatcherWatchdog {
|
||||||
|
|
||||||
private static final Noop INSTANCE = new Noop();
|
private static final Noop INSTANCE = new Noop();
|
||||||
|
|
||||||
private Noop() {
|
private Noop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void register() {
|
public void register(Matcher matcher) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long maxExecutionTimeInMillis() {
|
public long maxExecutionTimeInMillis() {
|
||||||
return Long.MAX_VALUE;
|
return Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unregister() {
|
public void unregister(Matcher matcher) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class Default implements ThreadWatchdog {
|
class Default implements MatcherWatchdog {
|
||||||
|
|
||||||
private final long interval;
|
private final long interval;
|
||||||
private final long maxExecutionTime;
|
private final long maxExecutionTime;
|
||||||
private final LongSupplier relativeTimeSupplier;
|
private final LongSupplier relativeTimeSupplier;
|
||||||
private final BiConsumer<Long, Runnable> scheduler;
|
private final BiConsumer<Long, Runnable> scheduler;
|
||||||
private final AtomicInteger registered = new AtomicInteger(0);
|
private final AtomicInteger registered = new AtomicInteger(0);
|
||||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||||
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
|
final ConcurrentHashMap<Matcher, Long> registry = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private Default(long interval,
|
private Default(long interval,
|
||||||
long maxExecutionTime,
|
long maxExecutionTime,
|
||||||
LongSupplier relativeTimeSupplier,
|
LongSupplier relativeTimeSupplier,
|
||||||
|
@ -118,30 +124,30 @@ public interface ThreadWatchdog {
|
||||||
this.relativeTimeSupplier = relativeTimeSupplier;
|
this.relativeTimeSupplier = relativeTimeSupplier;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void register() {
|
public void register(Matcher matcher) {
|
||||||
registered.getAndIncrement();
|
registered.getAndIncrement();
|
||||||
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
|
Long previousValue = registry.put(matcher, relativeTimeSupplier.getAsLong());
|
||||||
if (running.compareAndSet(false, true) == true) {
|
if (running.compareAndSet(false, true) == true) {
|
||||||
scheduler.accept(interval, this::interruptLongRunningExecutions);
|
scheduler.accept(interval, this::interruptLongRunningExecutions);
|
||||||
}
|
}
|
||||||
assert previousValue == null;
|
assert previousValue == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long maxExecutionTimeInMillis() {
|
public long maxExecutionTimeInMillis() {
|
||||||
return maxExecutionTime;
|
return maxExecutionTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregister() {
|
public void unregister(Matcher matcher) {
|
||||||
Long previousValue = registry.remove(Thread.currentThread());
|
Long previousValue = registry.remove(matcher);
|
||||||
registered.decrementAndGet();
|
registered.decrementAndGet();
|
||||||
assert previousValue != null;
|
assert previousValue != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void interruptLongRunningExecutions() {
|
private void interruptLongRunningExecutions() {
|
||||||
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
|
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
|
||||||
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
|
for (Map.Entry<Matcher, Long> entry : registry.entrySet()) {
|
||||||
if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) {
|
if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) {
|
||||||
entry.getKey().interrupt();
|
entry.getKey().interrupt();
|
||||||
// not removing the entry here, this happens in the unregister() method.
|
// not removing the entry here, this happens in the unregister() method.
|
||||||
|
@ -153,7 +159,7 @@ public interface ThreadWatchdog {
|
||||||
running.set(false);
|
running.set(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -430,7 +430,7 @@ public class GrokTests extends ESTestCase {
|
||||||
});
|
});
|
||||||
t.start();
|
t.start();
|
||||||
};
|
};
|
||||||
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
|
Grok grok = new Grok(basePatterns, grokPattern, MatcherWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
|
||||||
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
|
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
|
||||||
run.set(false);
|
run.set(false);
|
||||||
assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms"));
|
assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms"));
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.joni.Matcher;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
@ -31,15 +32,16 @@ import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
|
|
||||||
public class ThreadWatchdogTests extends ESTestCase {
|
public class MatcherWatchdogTests extends ESTestCase {
|
||||||
|
|
||||||
public void testInterrupt() throws Exception {
|
public void testInterrupt() throws Exception {
|
||||||
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed
|
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed
|
||||||
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
|
MatcherWatchdog watchdog = MatcherWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(delay);
|
Thread.sleep(delay);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -53,17 +55,17 @@ public class ThreadWatchdogTests extends ESTestCase {
|
||||||
thread.start();
|
thread.start();
|
||||||
});
|
});
|
||||||
|
|
||||||
Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;
|
Map<?, ?> registry = ((MatcherWatchdog.Default) watchdog).registry;
|
||||||
assertThat(registry.size(), is(0));
|
assertThat(registry.size(), is(0));
|
||||||
// need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted
|
// need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted
|
||||||
AtomicBoolean interrupted = new AtomicBoolean(false);
|
AtomicBoolean interrupted = new AtomicBoolean(false);
|
||||||
Thread thread = new Thread(() -> {
|
Thread thread = new Thread(() -> {
|
||||||
Thread currentThread = Thread.currentThread();
|
Matcher matcher = mock(Matcher.class);
|
||||||
watchdog.register();
|
watchdog.register(matcher);
|
||||||
while (currentThread.isInterrupted() == false) {}
|
verify(matcher, timeout(9999).times(1)).interrupt();
|
||||||
interrupted.set(true);
|
interrupted.set(true);
|
||||||
while (run.get()) {} // wait here so that the size of the registry can be asserted
|
while (run.get()) {} // wait here so that the size of the registry can be asserted
|
||||||
watchdog.unregister();
|
watchdog.unregister(matcher);
|
||||||
});
|
});
|
||||||
thread.start();
|
thread.start();
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
|
@ -79,7 +81,7 @@ public class ThreadWatchdogTests extends ESTestCase {
|
||||||
public void testIdleIfNothingRegistered() throws Exception {
|
public void testIdleIfNothingRegistered() throws Exception {
|
||||||
long interval = 1L;
|
long interval = 1L;
|
||||||
ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
|
ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
|
||||||
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis,
|
MatcherWatchdog watchdog = MatcherWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis,
|
||||||
(delay, command) -> threadPool.schedule(command, delay, TimeUnit.MILLISECONDS));
|
(delay, command) -> threadPool.schedule(command, delay, TimeUnit.MILLISECONDS));
|
||||||
// Periodic action is not scheduled because no thread is registered
|
// Periodic action is not scheduled because no thread is registered
|
||||||
verifyZeroInteractions(threadPool);
|
verifyZeroInteractions(threadPool);
|
||||||
|
@ -91,16 +93,20 @@ public class ThreadWatchdogTests extends ESTestCase {
|
||||||
}).when(threadPool).schedule(
|
}).when(threadPool).schedule(
|
||||||
any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS)
|
any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS)
|
||||||
);
|
);
|
||||||
watchdog.register();
|
Matcher matcher = mock(Matcher.class);
|
||||||
|
watchdog.register(matcher);
|
||||||
// Registering the first thread should have caused the command to get scheduled again
|
// Registering the first thread should have caused the command to get scheduled again
|
||||||
Runnable command = commandFuture.get(1L, TimeUnit.MILLISECONDS);
|
Runnable command = commandFuture.get(1L, TimeUnit.MILLISECONDS);
|
||||||
Mockito.reset(threadPool);
|
Mockito.reset(threadPool);
|
||||||
watchdog.unregister();
|
watchdog.unregister(matcher);
|
||||||
command.run();
|
command.run();
|
||||||
// Periodic action is not scheduled again because no thread is registered
|
// Periodic action is not scheduled again because no thread is registered
|
||||||
verifyZeroInteractions(threadPool);
|
verifyZeroInteractions(threadPool);
|
||||||
watchdog.register();
|
watchdog.register(matcher);
|
||||||
Thread otherThread = new Thread(watchdog::register);
|
Thread otherThread = new Thread(() -> {
|
||||||
|
Matcher otherMatcher = mock(Matcher.class);
|
||||||
|
watchdog.register(otherMatcher);
|
||||||
|
});
|
||||||
try {
|
try {
|
||||||
verify(threadPool).schedule(any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS));
|
verify(threadPool).schedule(any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS));
|
||||||
// Registering a second thread does not cause the command to get scheduled twice
|
// Registering a second thread does not cause the command to get scheduled twice
|
|
@ -20,7 +20,7 @@
|
||||||
package org.elasticsearch.ingest.common;
|
package org.elasticsearch.ingest.common;
|
||||||
|
|
||||||
import org.elasticsearch.grok.Grok;
|
import org.elasticsearch.grok.Grok;
|
||||||
import org.elasticsearch.grok.ThreadWatchdog;
|
import org.elasticsearch.grok.MatcherWatchdog;
|
||||||
import org.elasticsearch.ingest.AbstractProcessor;
|
import org.elasticsearch.ingest.AbstractProcessor;
|
||||||
import org.elasticsearch.ingest.ConfigurationUtils;
|
import org.elasticsearch.ingest.ConfigurationUtils;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
|
@ -44,11 +44,11 @@ public final class GrokProcessor extends AbstractProcessor {
|
||||||
private final boolean ignoreMissing;
|
private final boolean ignoreMissing;
|
||||||
|
|
||||||
GrokProcessor(String tag, Map<String, String> patternBank, List<String> matchPatterns, String matchField,
|
GrokProcessor(String tag, Map<String, String> patternBank, List<String> matchPatterns, String matchField,
|
||||||
boolean traceMatch, boolean ignoreMissing, ThreadWatchdog threadWatchdog) {
|
boolean traceMatch, boolean ignoreMissing, MatcherWatchdog matcherWatchdog) {
|
||||||
super(tag);
|
super(tag);
|
||||||
this.matchField = matchField;
|
this.matchField = matchField;
|
||||||
this.matchPatterns = matchPatterns;
|
this.matchPatterns = matchPatterns;
|
||||||
this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), threadWatchdog);
|
this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), matcherWatchdog);
|
||||||
this.traceMatch = traceMatch;
|
this.traceMatch = traceMatch;
|
||||||
this.ignoreMissing = ignoreMissing;
|
this.ignoreMissing = ignoreMissing;
|
||||||
}
|
}
|
||||||
|
@ -133,11 +133,11 @@ public final class GrokProcessor extends AbstractProcessor {
|
||||||
public static final class Factory implements Processor.Factory {
|
public static final class Factory implements Processor.Factory {
|
||||||
|
|
||||||
private final Map<String, String> builtinPatterns;
|
private final Map<String, String> builtinPatterns;
|
||||||
private final ThreadWatchdog threadWatchdog;
|
private final MatcherWatchdog matcherWatchdog;
|
||||||
|
|
||||||
public Factory(Map<String, String> builtinPatterns, ThreadWatchdog threadWatchdog) {
|
public Factory(Map<String, String> builtinPatterns, MatcherWatchdog matcherWatchdog) {
|
||||||
this.builtinPatterns = builtinPatterns;
|
this.builtinPatterns = builtinPatterns;
|
||||||
this.threadWatchdog = threadWatchdog;
|
this.matcherWatchdog = matcherWatchdog;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -159,7 +159,7 @@ public final class GrokProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing,
|
return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing,
|
||||||
threadWatchdog);
|
matcherWatchdog);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw newConfigurationException(TYPE, processorTag, "patterns",
|
throw newConfigurationException(TYPE, processorTag, "patterns",
|
||||||
"Invalid regex pattern found in: " + matchPatterns + ". " + e.getMessage());
|
"Invalid regex pattern found in: " + matchPatterns + ". " + e.getMessage());
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.settings.SettingsFilter;
|
import org.elasticsearch.common.settings.SettingsFilter;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.grok.Grok;
|
import org.elasticsearch.grok.Grok;
|
||||||
import org.elasticsearch.grok.ThreadWatchdog;
|
import org.elasticsearch.grok.MatcherWatchdog;
|
||||||
import org.elasticsearch.ingest.DropProcessor;
|
import org.elasticsearch.ingest.DropProcessor;
|
||||||
import org.elasticsearch.ingest.PipelineProcessor;
|
import org.elasticsearch.ingest.PipelineProcessor;
|
||||||
import org.elasticsearch.ingest.Processor;
|
import org.elasticsearch.ingest.Processor;
|
||||||
|
@ -109,10 +109,10 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
|
||||||
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME);
|
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
|
private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
|
||||||
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
|
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
|
||||||
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
|
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
|
||||||
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis,
|
return MatcherWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis,
|
||||||
parameters.relativeTimeSupplier, parameters.scheduler::apply);
|
parameters.relativeTimeSupplier, parameters.scheduler::apply);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
package org.elasticsearch.ingest.common;
|
package org.elasticsearch.ingest.common;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.grok.ThreadWatchdog;
|
import org.elasticsearch.grok.MatcherWatchdog;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -34,7 +34,7 @@ import static org.hamcrest.Matchers.notNullValue;
|
||||||
public class GrokProcessorFactoryTests extends ESTestCase {
|
public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
|
|
||||||
public void testBuild() throws Exception {
|
public void testBuild() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
|
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("field", "_field");
|
config.put("field", "_field");
|
||||||
|
@ -48,7 +48,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBuildWithIgnoreMissing() throws Exception {
|
public void testBuildWithIgnoreMissing() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
|
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("field", "_field");
|
config.put("field", "_field");
|
||||||
|
@ -63,7 +63,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBuildMissingField() throws Exception {
|
public void testBuildMissingField() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("patterns", Collections.singletonList("(?<foo>\\w+)"));
|
config.put("patterns", Collections.singletonList("(?<foo>\\w+)"));
|
||||||
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
|
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
|
||||||
|
@ -71,7 +71,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBuildMissingPatterns() throws Exception {
|
public void testBuildMissingPatterns() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("field", "foo");
|
config.put("field", "foo");
|
||||||
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
|
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
|
||||||
|
@ -79,7 +79,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBuildEmptyPatternsList() throws Exception {
|
public void testBuildEmptyPatternsList() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("field", "foo");
|
config.put("field", "foo");
|
||||||
config.put("patterns", Collections.emptyList());
|
config.put("patterns", Collections.emptyList());
|
||||||
|
@ -88,7 +88,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCreateWithCustomPatterns() throws Exception {
|
public void testCreateWithCustomPatterns() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
|
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("field", "_field");
|
config.put("field", "_field");
|
||||||
|
@ -101,7 +101,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCreateWithInvalidPattern() throws Exception {
|
public void testCreateWithInvalidPattern() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("field", "_field");
|
config.put("field", "_field");
|
||||||
config.put("patterns", Collections.singletonList("["));
|
config.put("patterns", Collections.singletonList("["));
|
||||||
|
@ -110,7 +110,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCreateWithInvalidPatternDefinition() throws Exception {
|
public void testCreateWithInvalidPatternDefinition() throws Exception {
|
||||||
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
|
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put("field", "_field");
|
config.put("field", "_field");
|
||||||
config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!"));
|
config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!"));
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest.common;
|
package org.elasticsearch.ingest.common;
|
||||||
|
|
||||||
import org.elasticsearch.grok.ThreadWatchdog;
|
import org.elasticsearch.grok.MatcherWatchdog;
|
||||||
import org.elasticsearch.ingest.IngestDocument;
|
import org.elasticsearch.ingest.IngestDocument;
|
||||||
import org.elasticsearch.ingest.RandomDocumentPicks;
|
import org.elasticsearch.ingest.RandomDocumentPicks;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -40,7 +40,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
doc.setFieldValue(fieldName, "1");
|
doc.setFieldValue(fieldName, "1");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertThat(doc.getFieldValue("one", String.class), equalTo("1"));
|
assertThat(doc.getFieldValue("one", String.class), equalTo("1"));
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
doc.setFieldValue(fieldName, "23");
|
doc.setFieldValue(fieldName, "23");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop());
|
||||||
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
||||||
assertThat(e.getMessage(), equalTo("Provided Grok expressions do not match field value: [23]"));
|
assertThat(e.getMessage(), equalTo("Provided Grok expressions do not match field value: [23]"));
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
doc.setFieldValue(fieldName, "23");
|
doc.setFieldValue(fieldName, "23");
|
||||||
Exception e = expectThrows(IllegalArgumentException.class, () -> new GrokProcessor(randomAlphaOfLength(10),
|
Exception e = expectThrows(IllegalArgumentException.class, () -> new GrokProcessor(randomAlphaOfLength(10),
|
||||||
Collections.singletonMap("ONE", "1"), Collections.singletonList("%{NOTONE:not_one}"), fieldName,
|
Collections.singletonMap("ONE", "1"), Collections.singletonList("%{NOTONE:not_one}"), fieldName,
|
||||||
false, false, ThreadWatchdog.noop()));
|
false, false, MatcherWatchdog.noop()));
|
||||||
assertThat(e.getMessage(), equalTo("Unable to find pattern [NOTONE] in Grok's pattern dictionary"));
|
assertThat(e.getMessage(), equalTo("Unable to find pattern [NOTONE] in Grok's pattern dictionary"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +71,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
originalDoc.setFieldValue(fieldName, fieldName);
|
originalDoc.setFieldValue(fieldName, fieldName);
|
||||||
IngestDocument doc = new IngestDocument(originalDoc);
|
IngestDocument doc = new IngestDocument(originalDoc);
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.emptyMap(),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.emptyMap(),
|
||||||
Collections.singletonList(fieldName), fieldName, false, false, ThreadWatchdog.noop());
|
Collections.singletonList(fieldName), fieldName, false, false, MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertThat(doc, equalTo(originalDoc));
|
assertThat(doc, equalTo(originalDoc));
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
doc.setFieldValue(fieldName, null);
|
doc.setFieldValue(fieldName, null);
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop());
|
||||||
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
||||||
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it."));
|
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it."));
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
originalIngestDocument.setFieldValue(fieldName, null);
|
originalIngestDocument.setFieldValue(fieldName, null);
|
||||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, true, MatcherWatchdog.noop());
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
assertIngestDocument(originalIngestDocument, ingestDocument);
|
assertIngestDocument(originalIngestDocument, ingestDocument);
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
doc.setFieldValue(fieldName, 1);
|
doc.setFieldValue(fieldName, 1);
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop());
|
||||||
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
||||||
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
|
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
doc.setFieldValue(fieldName, 1);
|
doc.setFieldValue(fieldName, 1);
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, true, MatcherWatchdog.noop());
|
||||||
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
||||||
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
|
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
|
||||||
}
|
}
|
||||||
|
@ -121,7 +121,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
String fieldName = "foo.bar";
|
String fieldName = "foo.bar";
|
||||||
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, false, MatcherWatchdog.noop());
|
||||||
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
|
||||||
assertThat(e.getMessage(), equalTo("field [foo] not present as part of path [foo.bar]"));
|
assertThat(e.getMessage(), equalTo("field [foo] not present as part of path [foo.bar]"));
|
||||||
}
|
}
|
||||||
|
@ -131,7 +131,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
|
||||||
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
|
||||||
Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop());
|
Collections.singletonList("%{ONE:one}"), fieldName, false, true, MatcherWatchdog.noop());
|
||||||
processor.execute(ingestDocument);
|
processor.execute(ingestDocument);
|
||||||
assertIngestDocument(originalIngestDocument, ingestDocument);
|
assertIngestDocument(originalIngestDocument, ingestDocument);
|
||||||
}
|
}
|
||||||
|
@ -145,7 +145,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
patternBank.put("TWO", "2");
|
patternBank.put("TWO", "2");
|
||||||
patternBank.put("THREE", "3");
|
patternBank.put("THREE", "3");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
||||||
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, ThreadWatchdog.noop());
|
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertThat(doc.hasField("one"), equalTo(false));
|
assertThat(doc.hasField("one"), equalTo(false));
|
||||||
assertThat(doc.getFieldValue("two", String.class), equalTo("2"));
|
assertThat(doc.getFieldValue("two", String.class), equalTo("2"));
|
||||||
|
@ -161,7 +161,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
patternBank.put("TWO", "2");
|
patternBank.put("TWO", "2");
|
||||||
patternBank.put("THREE", "3");
|
patternBank.put("THREE", "3");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
||||||
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, ThreadWatchdog.noop());
|
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertThat(doc.hasField("one"), equalTo(false));
|
assertThat(doc.hasField("one"), equalTo(false));
|
||||||
assertThat(doc.getFieldValue("two", String.class), equalTo("2"));
|
assertThat(doc.getFieldValue("two", String.class), equalTo("2"));
|
||||||
|
@ -176,7 +176,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
Map<String, String> patternBank = new HashMap<>();
|
Map<String, String> patternBank = new HashMap<>();
|
||||||
patternBank.put("ONE", "1");
|
patternBank.put("ONE", "1");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
||||||
Arrays.asList("%{ONE:one}"), fieldName, true, false, ThreadWatchdog.noop());
|
Arrays.asList("%{ONE:one}"), fieldName, true, false, MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertThat(doc.hasField("one"), equalTo(true));
|
assertThat(doc.hasField("one"), equalTo(true));
|
||||||
assertThat(doc.getFieldValue("_ingest._grok_match_index", String.class), equalTo("0"));
|
assertThat(doc.getFieldValue("_ingest._grok_match_index", String.class), equalTo("0"));
|
||||||
|
@ -207,7 +207,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
patternBank.put("TWO", "2");
|
patternBank.put("TWO", "2");
|
||||||
patternBank.put("THREE", "3");
|
patternBank.put("THREE", "3");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Arrays.asList("%{ONE:first}-%{TWO:second}",
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Arrays.asList("%{ONE:first}-%{TWO:second}",
|
||||||
"%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), ThreadWatchdog.noop());
|
"%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertThat(doc.getFieldValue("first", String.class), equalTo("1"));
|
assertThat(doc.getFieldValue("first", String.class), equalTo("1"));
|
||||||
assertThat(doc.getFieldValue("second", String.class), equalTo("3"));
|
assertThat(doc.getFieldValue("second", String.class), equalTo("3"));
|
||||||
|
@ -221,7 +221,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
patternBank.put("ONETWO", "1|2");
|
patternBank.put("ONETWO", "1|2");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
||||||
Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean(),
|
Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean(),
|
||||||
ThreadWatchdog.noop());
|
MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertThat(doc.getFieldValue("first", String.class), equalTo("1"));
|
assertThat(doc.getFieldValue("first", String.class), equalTo("1"));
|
||||||
}
|
}
|
||||||
|
@ -235,7 +235,7 @@ public class GrokProcessorTests extends ESTestCase {
|
||||||
patternBank.put("THREE", "3");
|
patternBank.put("THREE", "3");
|
||||||
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
|
||||||
Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(),
|
Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(),
|
||||||
ThreadWatchdog.noop());
|
MatcherWatchdog.noop());
|
||||||
processor.execute(doc);
|
processor.execute(doc);
|
||||||
assertFalse(doc.hasField("first"));
|
assertFalse(doc.hasField("first"));
|
||||||
assertThat(doc.getFieldValue("second", String.class), equalTo("3"));
|
assertThat(doc.getFieldValue("second", String.class), equalTo("3"));
|
||||||
|
|
|
@ -6,15 +6,17 @@
|
||||||
package org.elasticsearch.xpack.ml.filestructurefinder;
|
package org.elasticsearch.xpack.ml.filestructurefinder;
|
||||||
|
|
||||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||||
import org.elasticsearch.grok.Grok;
|
import org.elasticsearch.grok.Grok;
|
||||||
import org.elasticsearch.grok.ThreadWatchdog;
|
import org.elasticsearch.grok.MatcherWatchdog;
|
||||||
|
import org.joni.Matcher;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -38,7 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
public class TimeoutChecker implements Closeable {
|
public class TimeoutChecker implements Closeable {
|
||||||
|
|
||||||
private static final TimeoutCheckerWatchdog timeoutCheckerWatchdog = new TimeoutCheckerWatchdog();
|
private static final TimeoutCheckerWatchdog timeoutCheckerWatchdog = new TimeoutCheckerWatchdog();
|
||||||
public static final ThreadWatchdog watchdog = timeoutCheckerWatchdog;
|
public static final MatcherWatchdog watchdog = timeoutCheckerWatchdog;
|
||||||
|
|
||||||
private final String operation;
|
private final String operation;
|
||||||
private final TimeValue timeout;
|
private final TimeValue timeout;
|
||||||
|
@ -122,51 +124,68 @@ public class TimeoutChecker implements Closeable {
|
||||||
/**
|
/**
|
||||||
* An implementation of the type of watchdog used by the {@link Grok} class to interrupt
|
* An implementation of the type of watchdog used by the {@link Grok} class to interrupt
|
||||||
* matching operations that take too long. Rather than have a timeout per match operation
|
* matching operations that take too long. Rather than have a timeout per match operation
|
||||||
* like the {@link ThreadWatchdog.Default} implementation, the interruption is governed by
|
* like the {@link MatcherWatchdog.Default} implementation, the interruption is governed by
|
||||||
* a {@link TimeoutChecker} associated with the thread doing the matching.
|
* a {@link TimeoutChecker} associated with the thread doing the matching.
|
||||||
*/
|
*/
|
||||||
static class TimeoutCheckerWatchdog implements ThreadWatchdog {
|
static class TimeoutCheckerWatchdog implements MatcherWatchdog {
|
||||||
|
|
||||||
final ConcurrentHashMap<Thread, Tuple<AtomicBoolean, TimeValue>> registry = new ConcurrentHashMap<>();
|
final ConcurrentHashMap<Thread, WatchDogEntry> registry = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
void add(Thread thread, TimeValue timeout) {
|
void add(Thread thread, TimeValue timeout) {
|
||||||
Tuple<AtomicBoolean, TimeValue> previousValue = registry.put(thread, new Tuple<>(new AtomicBoolean(false), timeout));
|
WatchDogEntry previousValue = registry.put(thread, new WatchDogEntry(timeout));
|
||||||
assert previousValue == null;
|
assert previousValue == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void register() {
|
public void register(Matcher matcher) {
|
||||||
Tuple<AtomicBoolean, TimeValue> value = registry.get(Thread.currentThread());
|
WatchDogEntry value = registry.get(Thread.currentThread());
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
boolean wasFalse = value.v1().compareAndSet(false, true);
|
boolean wasFalse = value.registered.compareAndSet(false, true);
|
||||||
assert wasFalse;
|
assert wasFalse;
|
||||||
|
value.matchers.add(matcher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long maxExecutionTimeInMillis() {
|
public long maxExecutionTimeInMillis() {
|
||||||
Tuple<AtomicBoolean, TimeValue> value = registry.get(Thread.currentThread());
|
WatchDogEntry value = registry.get(Thread.currentThread());
|
||||||
return value != null ? value.v2().getMillis() : Long.MAX_VALUE;
|
return value != null ? value.timeout.getMillis() : Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void unregister() {
|
public void unregister(Matcher matcher) {
|
||||||
Tuple<AtomicBoolean, TimeValue> value = registry.get(Thread.currentThread());
|
WatchDogEntry value = registry.get(Thread.currentThread());
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
boolean wasTrue = value.v1().compareAndSet(true, false);
|
boolean wasTrue = value.registered.compareAndSet(true, false);
|
||||||
assert wasTrue;
|
assert wasTrue;
|
||||||
|
value.matchers.remove(matcher);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void remove(Thread thread) {
|
void remove(Thread thread) {
|
||||||
Tuple<AtomicBoolean, TimeValue> previousValue = registry.remove(thread);
|
WatchDogEntry previousValue = registry.remove(thread);
|
||||||
assert previousValue != null;
|
assert previousValue != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
void interruptLongRunningThreadIfRegistered(Thread thread) {
|
void interruptLongRunningThreadIfRegistered(Thread thread) {
|
||||||
Tuple<AtomicBoolean, TimeValue> value = registry.get(thread);
|
WatchDogEntry value = registry.get(thread);
|
||||||
if (value.v1().get()) {
|
if (value.registered.get()) {
|
||||||
thread.interrupt();
|
for (Matcher matcher : value.matchers) {
|
||||||
|
matcher.interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class WatchDogEntry {
|
||||||
|
|
||||||
|
final TimeValue timeout;
|
||||||
|
final AtomicBoolean registered;
|
||||||
|
final Collection<Matcher> matchers;
|
||||||
|
|
||||||
|
WatchDogEntry(TimeValue timeout) {
|
||||||
|
this.timeout = timeout;
|
||||||
|
this.registered = new AtomicBoolean(false);
|
||||||
|
this.matchers = new CopyOnWriteArrayList<>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,11 +9,16 @@ import org.elasticsearch.ElasticsearchTimeoutException;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.grok.Grok;
|
import org.elasticsearch.grok.Grok;
|
||||||
import org.elasticsearch.threadpool.Scheduler;
|
import org.elasticsearch.threadpool.Scheduler;
|
||||||
|
import org.joni.Matcher;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
public class TimeoutCheckerTests extends FileStructureTestCase {
|
public class TimeoutCheckerTests extends FileStructureTestCase {
|
||||||
|
|
||||||
private ScheduledExecutorService scheduler;
|
private ScheduledExecutorService scheduler;
|
||||||
|
@ -29,15 +34,12 @@ public class TimeoutCheckerTests extends FileStructureTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckNoTimeout() {
|
public void testCheckNoTimeout() {
|
||||||
|
|
||||||
NOOP_TIMEOUT_CHECKER.check("should never happen");
|
NOOP_TIMEOUT_CHECKER.check("should never happen");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckTimeoutNotExceeded() throws InterruptedException {
|
public void testCheckTimeoutNotExceeded() throws InterruptedException {
|
||||||
|
|
||||||
TimeValue timeout = TimeValue.timeValueSeconds(10);
|
TimeValue timeout = TimeValue.timeValueSeconds(10);
|
||||||
try (TimeoutChecker timeoutChecker = new TimeoutChecker("timeout not exceeded test", timeout, scheduler)) {
|
try (TimeoutChecker timeoutChecker = new TimeoutChecker("timeout not exceeded test", timeout, scheduler)) {
|
||||||
|
|
||||||
for (int count = 0; count < 10; ++count) {
|
for (int count = 0; count < 10; ++count) {
|
||||||
timeoutChecker.check("should not timeout");
|
timeoutChecker.check("should not timeout");
|
||||||
Thread.sleep(randomIntBetween(1, 10));
|
Thread.sleep(randomIntBetween(1, 10));
|
||||||
|
@ -46,10 +48,8 @@ public class TimeoutCheckerTests extends FileStructureTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCheckTimeoutExceeded() throws Exception {
|
public void testCheckTimeoutExceeded() throws Exception {
|
||||||
|
|
||||||
TimeValue timeout = TimeValue.timeValueMillis(10);
|
TimeValue timeout = TimeValue.timeValueMillis(10);
|
||||||
try (TimeoutChecker timeoutChecker = new TimeoutChecker("timeout exceeded test", timeout, scheduler)) {
|
try (TimeoutChecker timeoutChecker = new TimeoutChecker("timeout exceeded test", timeout, scheduler)) {
|
||||||
|
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
ElasticsearchTimeoutException e = expectThrows(ElasticsearchTimeoutException.class,
|
ElasticsearchTimeoutException e = expectThrows(ElasticsearchTimeoutException.class,
|
||||||
() -> timeoutChecker.check("should timeout"));
|
() -> timeoutChecker.check("should timeout"));
|
||||||
|
@ -59,30 +59,27 @@ public class TimeoutCheckerTests extends FileStructureTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWatchdog() {
|
public void testWatchdog() throws Exception {
|
||||||
|
TimeValue timeout = TimeValue.timeValueMillis(500);
|
||||||
assertFalse(Thread.interrupted());
|
|
||||||
|
|
||||||
TimeValue timeout = TimeValue.timeValueMillis(100);
|
|
||||||
try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) {
|
try (TimeoutChecker timeoutChecker = new TimeoutChecker("watchdog test", timeout, scheduler)) {
|
||||||
|
TimeoutChecker.TimeoutCheckerWatchdog watchdog = (TimeoutChecker.TimeoutCheckerWatchdog) TimeoutChecker.watchdog;
|
||||||
|
|
||||||
TimeoutChecker.watchdog.register();
|
Matcher matcher = mock(Matcher.class);
|
||||||
|
TimeoutChecker.watchdog.register(matcher);
|
||||||
|
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(1));
|
||||||
try {
|
try {
|
||||||
expectThrows(InterruptedException.class, () -> Thread.sleep(10000));
|
assertBusy(() -> {
|
||||||
|
verify(matcher).interrupt();
|
||||||
|
});
|
||||||
} finally {
|
} finally {
|
||||||
TimeoutChecker.watchdog.unregister();
|
TimeoutChecker.watchdog.unregister(matcher);
|
||||||
|
assertThat(watchdog.registry.get(Thread.currentThread()).matchers.size(), equalTo(0));
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
// ensure the interrupted flag is cleared to stop it making subsequent tests fail
|
|
||||||
Thread.interrupted();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGrokCaptures() throws Exception {
|
public void testGrokCaptures() throws Exception {
|
||||||
|
|
||||||
assertFalse(Thread.interrupted());
|
|
||||||
Grok grok = new Grok(Grok.getBuiltinPatterns(), "{%DATA:data}{%GREEDYDATA:greedydata}", TimeoutChecker.watchdog);
|
Grok grok = new Grok(Grok.getBuiltinPatterns(), "{%DATA:data}{%GREEDYDATA:greedydata}", TimeoutChecker.watchdog);
|
||||||
|
|
||||||
TimeValue timeout = TimeValue.timeValueMillis(1);
|
TimeValue timeout = TimeValue.timeValueMillis(1);
|
||||||
try (TimeoutChecker timeoutChecker = new TimeoutChecker("grok captures test", timeout, scheduler)) {
|
try (TimeoutChecker timeoutChecker = new TimeoutChecker("grok captures test", timeout, scheduler)) {
|
||||||
|
|
||||||
|
@ -92,9 +89,6 @@ public class TimeoutCheckerTests extends FileStructureTestCase {
|
||||||
assertEquals("Aborting grok captures test during [should timeout] as it has taken longer than the timeout of [" +
|
assertEquals("Aborting grok captures test during [should timeout] as it has taken longer than the timeout of [" +
|
||||||
timeout + "]", e.getMessage());
|
timeout + "]", e.getMessage());
|
||||||
});
|
});
|
||||||
} finally {
|
|
||||||
// ensure the interrupted flag is cleared to stop it making subsequent tests fail
|
|
||||||
Thread.interrupted();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue