[INGEST] Interrupt the current thread if evaluation grok expressions take too long (#31024)

This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search()
This method can hang forever if the regex expression is too complex.

The thread interrupter in the background checks every 3 seconds whether there are threads
execution the org.joni.Matcher#search() method for longer than 5 seconds and
if so interrupts these threads.

Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and
if so returns org.joni.Matcher#INTERRUPTED

Closes #28731
This commit is contained in:
Martijn van Groningen 2018-06-12 07:49:03 +02:00 committed by GitHub
parent 1dbe554e5e
commit 6030d4be1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 385 additions and 53 deletions

View File

@ -1512,6 +1512,24 @@ The above request will return a response body containing a key-value representat
This can be useful to reference as the built-in patterns change across versions.
[[grok-watchdog]]
==== Grok watchdog
Grok expressions that take too long to execute are interrupted and
the grok processor then fails with an exception. The grok
processor has a watchdog thread that determines when evaluation of
a grok expression takes too long and is controlled by the following
settings:
[[grok-watchdog-options]]
.Grok watchdog settings
[options="header"]
|======
| Name | Default | Description
| `ingest.grok.watchdog.interval` | 1s | How often to check whether there are grok evaluations that take longer than the maximum allowed execution time.
| `ingest.grok.watchdog.max_execution_time` | 1s | The maximum allowed execution of a grok expression evaluation.
|======
[[gsub-processor]]
=== Gsub Processor
Converts a string field by applying a regular expression and a replacement.

View File

@ -76,15 +76,24 @@ public final class Grok {
private final Map<String, String> patternBank;
private final boolean namedCaptures;
private final Regex compiledExpression;
private final ThreadWatchdog threadWatchdog;
public Grok(Map<String, String> patternBank, String grokPattern) {
this(patternBank, grokPattern, true);
this(patternBank, grokPattern, true, ThreadWatchdog.noop());
}
public Grok(Map<String, String> patternBank, String grokPattern, ThreadWatchdog threadWatchdog) {
this(patternBank, grokPattern, true, threadWatchdog);
}
Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop());
}
@SuppressWarnings("unchecked")
Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) {
this.patternBank = patternBank;
this.namedCaptures = namedCaptures;
this.threadWatchdog = threadWatchdog;
for (Map.Entry<String, String> entry : patternBank.entrySet()) {
String name = entry.getKey();
@ -163,7 +172,13 @@ public final class Grok {
byte[] grokPatternBytes = grokPattern.getBytes(StandardCharsets.UTF_8);
Matcher matcher = GROK_PATTERN_REGEX.matcher(grokPatternBytes);
int result = matcher.search(0, grokPatternBytes.length, Option.NONE);
int result;
try {
threadWatchdog.register();
result = matcher.search(0, grokPatternBytes.length, Option.NONE);
} finally {
threadWatchdog.unregister();
}
if (result != -1) {
Region region = matcher.getEagerRegion();
String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern);
@ -205,7 +220,13 @@ public final class Grok {
*/
public boolean match(String text) {
Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8));
int result = matcher.search(0, text.length(), Option.DEFAULT);
int result;
try {
threadWatchdog.register();
result = matcher.search(0, text.length(), Option.DEFAULT);
} finally {
threadWatchdog.unregister();
}
return (result != -1);
}
@ -220,8 +241,20 @@ public final class Grok {
byte[] textAsBytes = text.getBytes(StandardCharsets.UTF_8);
Map<String, Object> fields = new HashMap<>();
Matcher matcher = compiledExpression.matcher(textAsBytes);
int result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
if (result != -1 && compiledExpression.numberOfNames() > 0) {
int result;
try {
threadWatchdog.register();
result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
} finally {
threadWatchdog.unregister();
}
if (result == Matcher.INTERRUPTED) {
throw new RuntimeException("grok pattern matching was interrupted after [" +
threadWatchdog.maxExecutionTimeInMillis() + "] ms");
} else if (result == Matcher.FAILED) {
// TODO: I think we should throw an error here?
return null;
} else if (compiledExpression.numberOfNames() > 0) {
Region region = matcher.getEagerRegion();
for (Iterator<NameEntry> entry = compiledExpression.namedBackrefIterator(); entry.hasNext();) {
NameEntry e = entry.next();
@ -235,13 +268,9 @@ public final class Grok {
break;
}
}
}
return fields;
} else if (result != -1) {
return fields;
}
return null;
return fields;
}
public static Map<String, String> getBuiltinPatterns() {

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.grok;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
/**
* Protects against long running operations that happen between the register and unregister invocations.
* Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method
* will be interrupted.
*
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
* it can end up spinning endlessly if the regular expression is too complex. Joni has checks
* that for every 30k iterations it checks if the current thread is interrupted and if so
* returns {@link org.joni.Matcher#INTERRUPTED}.
*/
public interface ThreadWatchdog {
/**
* Registers the current thread and interrupts the current thread
* if the takes too long for this thread to invoke {@link #unregister()}.
*/
void register();
/**
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()}
* after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread.
*/
long maxExecutionTimeInMillis();
/**
* Unregisters the current thread and prevents it from being interrupted.
*/
void unregister();
/**
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()}
* and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and
* then interrupts these threads.
*
* @param interval The fixed interval to check if there are threads to interrupt
* @param maxExecutionTime The time a thread has the execute an operation.
* @param relativeTimeSupplier A supplier that returns relative time
* @param scheduler A scheduler that is able to execute a command for each fixed interval
*/
static ThreadWatchdog newInstance(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
}
/**
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
*/
static ThreadWatchdog noop() {
return Noop.INSTANCE;
}
class Noop implements ThreadWatchdog {
private static final Noop INSTANCE = new Noop();
private Noop() {
}
@Override
public void register() {
}
@Override
public long maxExecutionTimeInMillis() {
return Long.MAX_VALUE;
}
@Override
public void unregister() {
}
}
class Default implements ThreadWatchdog {
private final long interval;
private final long maxExecutionTime;
private final LongSupplier relativeTimeSupplier;
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
private Default(long interval,
long maxExecutionTime,
LongSupplier relativeTimeSupplier,
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
this.interval = interval;
this.maxExecutionTime = maxExecutionTime;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
scheduler.apply(interval, this::interruptLongRunningExecutions);
}
public void register() {
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
assert previousValue == null;
}
@Override
public long maxExecutionTimeInMillis() {
return maxExecutionTime;
}
public void unregister() {
Long previousValue = registry.remove(Thread.currentThread());
assert previousValue != null;
}
private void interruptLongRunningExecutions() {
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) {
entry.getKey().interrupt();
// not removing the entry here, this happens in the unregister() method.
}
}
scheduler.apply(interval, this::interruptLongRunningExecutions);
}
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.grok;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
@ -28,6 +27,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -35,12 +37,7 @@ import static org.hamcrest.Matchers.nullValue;
public class GrokTests extends ESTestCase {
private Map<String, String> basePatterns;
@Before
public void setup() {
basePatterns = Grok.getBuiltinPatterns();
}
private static final Map<String, String> basePatterns = Grok.getBuiltinPatterns();
public void testMatchWithoutCaptures() {
String line = "value";
@ -415,4 +412,31 @@ public class GrokTests extends ESTestCase {
expected.put("num", "1");
assertThat(grok.captures("12"), equalTo(expected));
}
public void testExponentialExpressions() {
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed
String grokPattern = "Bonsuche mit folgender Anfrage: Belegart->\\[%{WORD:param2},(?<param5>(\\s*%{NOTSPACE})*)\\] " +
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Thread t = new Thread(() -> {
if (run.get()) {
command.run();
}
});
t.start();
return null;
};
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
run.set(false);
assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms"));
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.grok;
import org.elasticsearch.test.ESTestCase;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.is;
public class ThreadWatchdogTests extends ESTestCase {
public void testInterrupt() throws Exception {
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Thread thread = new Thread(() -> {
if (run.get()) {
command.run();
}
});
thread.start();
return null;
});
Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;
assertThat(registry.size(), is(0));
// need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted
AtomicBoolean interrupted = new AtomicBoolean(false);
Thread thread = new Thread(() -> {
Thread currentThread = Thread.currentThread();
watchdog.register();
while (currentThread.isInterrupted() == false) {}
interrupted.set(true);
while (run.get()) {} // wait here so that the size of the registry can be asserted
watchdog.unregister();
});
thread.start();
assertBusy(() -> {
assertThat(interrupted.get(), is(true));
assertThat(registry.size(), is(1));
});
run.set(false);
assertBusy(() -> {
assertThat(registry.size(), is(0));
});
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.grok.ThreadWatchdog;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
@ -43,11 +44,11 @@ public final class GrokProcessor extends AbstractProcessor {
private final boolean ignoreMissing;
GrokProcessor(String tag, Map<String, String> patternBank, List<String> matchPatterns, String matchField,
boolean traceMatch, boolean ignoreMissing) {
boolean traceMatch, boolean ignoreMissing, ThreadWatchdog threadWatchdog) {
super(tag);
this.matchField = matchField;
this.matchPatterns = matchPatterns;
this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch));
this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), threadWatchdog);
this.traceMatch = traceMatch;
this.ignoreMissing = ignoreMissing;
}
@ -132,9 +133,11 @@ public final class GrokProcessor extends AbstractProcessor {
public static final class Factory implements Processor.Factory {
private final Map<String, String> builtinPatterns;
private final ThreadWatchdog threadWatchdog;
public Factory(Map<String, String> builtinPatterns) {
public Factory(Map<String, String> builtinPatterns, ThreadWatchdog threadWatchdog) {
this.builtinPatterns = builtinPatterns;
this.threadWatchdog = threadWatchdog;
}
@Override
@ -155,7 +158,8 @@ public final class GrokProcessor extends AbstractProcessor {
}
try {
return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing);
return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing,
threadWatchdog);
} catch (Exception e) {
throw newConfigurationException(TYPE, processorTag, "patterns",
"Invalid regex pattern found in: " + matchPatterns + ". " + e.getMessage());

View File

@ -25,9 +25,12 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.grok.ThreadWatchdog;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.IngestPlugin;
@ -45,6 +48,10 @@ import java.util.function.Supplier;
public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin {
static final Map<String, String> GROK_PATTERNS = Grok.getBuiltinPatterns();
static final Setting<TimeValue> WATCHDOG_INTERVAL =
Setting.timeSetting("ingest.grok.watchdog.interval", TimeValue.timeValueSeconds(1), Setting.Property.NodeScope);
static final Setting<TimeValue> WATCHDOG_MAX_EXECUTION_TIME =
Setting.timeSetting("ingest.grok.watchdog.max_execution_time", TimeValue.timeValueSeconds(1), Setting.Property.NodeScope);
public IngestCommonPlugin() {
}
@ -68,7 +75,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory());
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory());
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS));
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters)));
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory());
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
@ -89,5 +96,16 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl
Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList(new GrokProcessorGetAction.RestAction(settings, restController));
}
@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME);
}
private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.grok.ThreadWatchdog;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
@ -33,7 +34,7 @@ import static org.hamcrest.Matchers.notNullValue;
public class GrokProcessorFactoryTests extends ESTestCase {
public void testBuild() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -47,7 +48,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testBuildWithIgnoreMissing() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -62,7 +63,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testBuildMissingField() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("patterns", Collections.singletonList("(?<foo>\\w+)"));
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
@ -70,7 +71,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testBuildMissingPatterns() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config));
@ -78,7 +79,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testBuildEmptyPatternsList() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("field", "foo");
config.put("patterns", Collections.emptyList());
@ -87,7 +88,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testCreateWithCustomPatterns() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
@ -100,7 +101,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testCreateWithInvalidPattern() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("patterns", Collections.singletonList("["));
@ -109,7 +110,7 @@ public class GrokProcessorFactoryTests extends ESTestCase {
}
public void testCreateWithInvalidPatternDefinition() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap());
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop());
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!"));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.grok.ThreadWatchdog;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
@ -39,7 +40,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, "1");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, false);
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
processor.execute(doc);
assertThat(doc.getFieldValue("one", String.class), equalTo("1"));
}
@ -49,7 +50,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, "23");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, false);
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
assertThat(e.getMessage(), equalTo("Provided Grok expressions do not match field value: [23]"));
}
@ -60,7 +61,7 @@ public class GrokProcessorTests extends ESTestCase {
doc.setFieldValue(fieldName, "23");
Exception e = expectThrows(IllegalArgumentException.class, () -> new GrokProcessor(randomAlphaOfLength(10),
Collections.singletonMap("ONE", "1"), Collections.singletonList("%{NOTONE:not_one}"), fieldName,
false, false));
false, false, ThreadWatchdog.noop()));
assertThat(e.getMessage(), equalTo("Unable to find pattern [NOTONE] in Grok's pattern dictionary"));
}
@ -70,7 +71,7 @@ public class GrokProcessorTests extends ESTestCase {
originalDoc.setFieldValue(fieldName, fieldName);
IngestDocument doc = new IngestDocument(originalDoc);
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.emptyMap(),
Collections.singletonList(fieldName), fieldName, false, false);
Collections.singletonList(fieldName), fieldName, false, false, ThreadWatchdog.noop());
processor.execute(doc);
assertThat(doc, equalTo(originalDoc));
}
@ -80,7 +81,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, null);
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, false);
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it."));
}
@ -91,7 +92,7 @@ public class GrokProcessorTests extends ESTestCase {
originalIngestDocument.setFieldValue(fieldName, null);
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, true);
Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop());
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}
@ -101,7 +102,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, 1);
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, false);
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
}
@ -111,7 +112,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
doc.setFieldValue(fieldName, 1);
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, true);
Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop());
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
}
@ -120,7 +121,7 @@ public class GrokProcessorTests extends ESTestCase {
String fieldName = "foo.bar";
IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, false);
Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop());
Exception e = expectThrows(Exception.class, () -> processor.execute(doc));
assertThat(e.getMessage(), equalTo("field [foo] not present as part of path [foo.bar]"));
}
@ -130,7 +131,7 @@ public class GrokProcessorTests extends ESTestCase {
IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"),
Collections.singletonList("%{ONE:one}"), fieldName, false, true);
Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop());
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}
@ -144,7 +145,7 @@ public class GrokProcessorTests extends ESTestCase {
patternBank.put("TWO", "2");
patternBank.put("THREE", "3");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false);
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, ThreadWatchdog.noop());
processor.execute(doc);
assertThat(doc.hasField("one"), equalTo(false));
assertThat(doc.getFieldValue("two", String.class), equalTo("2"));
@ -160,7 +161,7 @@ public class GrokProcessorTests extends ESTestCase {
patternBank.put("TWO", "2");
patternBank.put("THREE", "3");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false);
Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, ThreadWatchdog.noop());
processor.execute(doc);
assertThat(doc.hasField("one"), equalTo(false));
assertThat(doc.getFieldValue("two", String.class), equalTo("2"));
@ -175,7 +176,7 @@ public class GrokProcessorTests extends ESTestCase {
Map<String, String> patternBank = new HashMap<>();
patternBank.put("ONE", "1");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
Arrays.asList("%{ONE:one}"), fieldName, true, false);
Arrays.asList("%{ONE:one}"), fieldName, true, false, ThreadWatchdog.noop());
processor.execute(doc);
assertThat(doc.hasField("one"), equalTo(true));
assertThat(doc.getFieldValue("_ingest._grok_match_index", String.class), equalTo("0"));
@ -205,8 +206,8 @@ public class GrokProcessorTests extends ESTestCase {
patternBank.put("ONE", "1");
patternBank.put("TWO", "2");
patternBank.put("THREE", "3");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
Arrays.asList("%{ONE:first}-%{TWO:second}", "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean());
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Arrays.asList("%{ONE:first}-%{TWO:second}",
"%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), ThreadWatchdog.noop());
processor.execute(doc);
assertThat(doc.getFieldValue("first", String.class), equalTo("1"));
assertThat(doc.getFieldValue("second", String.class), equalTo("3"));
@ -219,7 +220,8 @@ public class GrokProcessorTests extends ESTestCase {
Map<String, String> patternBank = new HashMap<>();
patternBank.put("ONETWO", "1|2");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean());
Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean(),
ThreadWatchdog.noop());
processor.execute(doc);
assertThat(doc.getFieldValue("first", String.class), equalTo("1"));
}
@ -232,7 +234,8 @@ public class GrokProcessorTests extends ESTestCase {
patternBank.put("ONETWO", "1|2");
patternBank.put("THREE", "3");
GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank,
Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean());
Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(),
ThreadWatchdog.noop());
processor.execute(doc);
assertFalse(doc.hasField("first"));
assertThat(doc.getFieldValue("second", String.class), equalTo("3"));

View File

@ -24,8 +24,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.plugins.IngestPlugin;
@ -42,8 +45,10 @@ public class IngestService {
public IngestService(Settings settings, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {
Processor.Parameters parameters = new Processor.Parameters(env, scriptService,
analysisRegistry, threadPool.getThreadContext());
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler =
(delay, command) -> threadPool.schedule(TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command);
Processor.Parameters parameters = new Processor.Parameters(env, scriptService, analysisRegistry,
threadPool.getThreadContext(), threadPool::relativeTimeInMillis, scheduler);
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (IngestPlugin ingestPlugin : ingestPlugins) {
Map<String, Processor.Factory> newProcessors = ingestPlugin.getProcessors(parameters);

View File

@ -25,6 +25,9 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.script.ScriptService;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
/**
* A processor implementation may modify the data belonging to a document.
@ -94,13 +97,22 @@ public interface Processor {
* instances that have run prior to in ingest.
*/
public final ThreadContext threadContext;
public final LongSupplier relativeTimeSupplier;
/**
* Provides scheduler support
*/
public final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
public Parameters(Environment env, ScriptService scriptService,
AnalysisRegistry analysisRegistry, ThreadContext threadContext) {
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
}
}