diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 182a3d5fc78..009a67699e3 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -1512,6 +1512,24 @@ The above request will return a response body containing a key-value representat This can be useful to reference as the built-in patterns change across versions. +[[grok-watchdog]] +==== Grok watchdog + +Grok expressions that take too long to execute are interrupted and +the grok processor then fails with an exception. The grok +processor has a watchdog thread that determines when evaluation of +a grok expression takes too long and is controlled by the following +settings: + +[[grok-watchdog-options]] +.Grok watchdog settings +[options="header"] +|====== +| Name | Default | Description +| `ingest.grok.watchdog.interval` | 1s | How often to check whether there are grok evaluations that take longer than the maximum allowed execution time. +| `ingest.grok.watchdog.max_execution_time` | 1s | The maximum allowed execution of a grok expression evaluation. +|====== + [[gsub-processor]] === Gsub Processor Converts a string field by applying a regular expression and a replacement. diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java index 3800c7711a2..02388d838bc 100644 --- a/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java +++ b/libs/grok/src/main/java/org/elasticsearch/grok/Grok.java @@ -76,15 +76,24 @@ public final class Grok { private final Map patternBank; private final boolean namedCaptures; private final Regex compiledExpression; + private final ThreadWatchdog threadWatchdog; public Grok(Map patternBank, String grokPattern) { - this(patternBank, grokPattern, true); + this(patternBank, grokPattern, true, ThreadWatchdog.noop()); + } + + public Grok(Map patternBank, String grokPattern, ThreadWatchdog threadWatchdog) { + this(patternBank, grokPattern, true, threadWatchdog); + } + + Grok(Map patternBank, String grokPattern, boolean namedCaptures) { + this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop()); } - @SuppressWarnings("unchecked") - Grok(Map patternBank, String grokPattern, boolean namedCaptures) { + private Grok(Map patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) { this.patternBank = patternBank; this.namedCaptures = namedCaptures; + this.threadWatchdog = threadWatchdog; for (Map.Entry entry : patternBank.entrySet()) { String name = entry.getKey(); @@ -163,7 +172,13 @@ public final class Grok { byte[] grokPatternBytes = grokPattern.getBytes(StandardCharsets.UTF_8); Matcher matcher = GROK_PATTERN_REGEX.matcher(grokPatternBytes); - int result = matcher.search(0, grokPatternBytes.length, Option.NONE); + int result; + try { + threadWatchdog.register(); + result = matcher.search(0, grokPatternBytes.length, Option.NONE); + } finally { + threadWatchdog.unregister(); + } if (result != -1) { Region region = matcher.getEagerRegion(); String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern); @@ -205,7 +220,13 @@ public final class Grok { */ public boolean match(String text) { Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8)); - int result = matcher.search(0, text.length(), Option.DEFAULT); + int result; + try { + threadWatchdog.register(); + result = matcher.search(0, text.length(), Option.DEFAULT); + } finally { + threadWatchdog.unregister(); + } return (result != -1); } @@ -220,8 +241,20 @@ public final class Grok { byte[] textAsBytes = text.getBytes(StandardCharsets.UTF_8); Map fields = new HashMap<>(); Matcher matcher = compiledExpression.matcher(textAsBytes); - int result = matcher.search(0, textAsBytes.length, Option.DEFAULT); - if (result != -1 && compiledExpression.numberOfNames() > 0) { + int result; + try { + threadWatchdog.register(); + result = matcher.search(0, textAsBytes.length, Option.DEFAULT); + } finally { + threadWatchdog.unregister(); + } + if (result == Matcher.INTERRUPTED) { + throw new RuntimeException("grok pattern matching was interrupted after [" + + threadWatchdog.maxExecutionTimeInMillis() + "] ms"); + } else if (result == Matcher.FAILED) { + // TODO: I think we should throw an error here? + return null; + } else if (compiledExpression.numberOfNames() > 0) { Region region = matcher.getEagerRegion(); for (Iterator entry = compiledExpression.namedBackrefIterator(); entry.hasNext();) { NameEntry e = entry.next(); @@ -235,13 +268,9 @@ public final class Grok { break; } } - } - return fields; - } else if (result != -1) { - return fields; } - return null; + return fields; } public static Map getBuiltinPatterns() { diff --git a/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java new file mode 100644 index 00000000000..d0de7637d2c --- /dev/null +++ b/libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.grok; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; +import java.util.function.LongSupplier; + +/** + * Protects against long running operations that happen between the register and unregister invocations. + * Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method + * will be interrupted. + * + * This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because + * it can end up spinning endlessly if the regular expression is too complex. Joni has checks + * that for every 30k iterations it checks if the current thread is interrupted and if so + * returns {@link org.joni.Matcher#INTERRUPTED}. + */ +public interface ThreadWatchdog { + + /** + * Registers the current thread and interrupts the current thread + * if the takes too long for this thread to invoke {@link #unregister()}. + */ + void register(); + + /** + * @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()} + * after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread. + */ + long maxExecutionTimeInMillis(); + + /** + * Unregisters the current thread and prevents it from being interrupted. + */ + void unregister(); + + /** + * Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()} + * and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and + * then interrupts these threads. + * + * @param interval The fixed interval to check if there are threads to interrupt + * @param maxExecutionTime The time a thread has the execute an operation. + * @param relativeTimeSupplier A supplier that returns relative time + * @param scheduler A scheduler that is able to execute a command for each fixed interval + */ + static ThreadWatchdog newInstance(long interval, + long maxExecutionTime, + LongSupplier relativeTimeSupplier, + BiFunction> scheduler) { + return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler); + } + + /** + * @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions. + */ + static ThreadWatchdog noop() { + return Noop.INSTANCE; + } + + class Noop implements ThreadWatchdog { + + private static final Noop INSTANCE = new Noop(); + + private Noop() { + } + + @Override + public void register() { + } + + @Override + public long maxExecutionTimeInMillis() { + return Long.MAX_VALUE; + } + + @Override + public void unregister() { + } + } + + class Default implements ThreadWatchdog { + + private final long interval; + private final long maxExecutionTime; + private final LongSupplier relativeTimeSupplier; + private final BiFunction> scheduler; + final ConcurrentHashMap registry = new ConcurrentHashMap<>(); + + private Default(long interval, + long maxExecutionTime, + LongSupplier relativeTimeSupplier, + BiFunction> scheduler) { + this.interval = interval; + this.maxExecutionTime = maxExecutionTime; + this.relativeTimeSupplier = relativeTimeSupplier; + this.scheduler = scheduler; + scheduler.apply(interval, this::interruptLongRunningExecutions); + } + + public void register() { + Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong()); + assert previousValue == null; + } + + @Override + public long maxExecutionTimeInMillis() { + return maxExecutionTime; + } + + public void unregister() { + Long previousValue = registry.remove(Thread.currentThread()); + assert previousValue != null; + } + + private void interruptLongRunningExecutions() { + final long currentRelativeTime = relativeTimeSupplier.getAsLong(); + for (Map.Entry entry : registry.entrySet()) { + if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) { + entry.getKey().interrupt(); + // not removing the entry here, this happens in the unregister() method. + } + } + scheduler.apply(interval, this::interruptLongRunningExecutions); + } + + } + +} diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java index aa95f6fdb73..8d79aa290eb 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.grok; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.util.Arrays; import java.util.Collections; @@ -28,6 +27,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -35,12 +37,7 @@ import static org.hamcrest.Matchers.nullValue; public class GrokTests extends ESTestCase { - private Map basePatterns; - - @Before - public void setup() { - basePatterns = Grok.getBuiltinPatterns(); - } + private static final Map basePatterns = Grok.getBuiltinPatterns(); public void testMatchWithoutCaptures() { String line = "value"; @@ -415,4 +412,31 @@ public class GrokTests extends ESTestCase { expected.put("num", "1"); assertThat(grok.captures("12"), equalTo(expected)); } + + public void testExponentialExpressions() { + AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed + + String grokPattern = "Bonsuche mit folgender Anfrage: Belegart->\\[%{WORD:param2},(?(\\s*%{NOTSPACE})*)\\] " + + "Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}"; + String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " + + "Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018"; + BiFunction> scheduler = (delay, command) -> { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + Thread t = new Thread(() -> { + if (run.get()) { + command.run(); + } + }); + t.start(); + return null; + }; + Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler)); + Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine)); + run.set(false); + assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms")); + } } diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java new file mode 100644 index 00000000000..46faa4ae05d --- /dev/null +++ b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.grok; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.is; + +public class ThreadWatchdogTests extends ESTestCase { + + public void testInterrupt() throws Exception { + AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed + ThreadWatchdog watchdog = ThreadWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + Thread thread = new Thread(() -> { + if (run.get()) { + command.run(); + } + }); + thread.start(); + return null; + }); + + Map registry = ((ThreadWatchdog.Default) watchdog).registry; + assertThat(registry.size(), is(0)); + // need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted + AtomicBoolean interrupted = new AtomicBoolean(false); + Thread thread = new Thread(() -> { + Thread currentThread = Thread.currentThread(); + watchdog.register(); + while (currentThread.isInterrupted() == false) {} + interrupted.set(true); + while (run.get()) {} // wait here so that the size of the registry can be asserted + watchdog.unregister(); + }); + thread.start(); + assertBusy(() -> { + assertThat(interrupted.get(), is(true)); + assertThat(registry.size(), is(1)); + }); + run.set(false); + assertBusy(() -> { + assertThat(registry.size(), is(0)); + }); + } + +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java index 8d1d2127e72..7bb3ebfba6e 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.grok.Grok; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; @@ -43,11 +44,11 @@ public final class GrokProcessor extends AbstractProcessor { private final boolean ignoreMissing; GrokProcessor(String tag, Map patternBank, List matchPatterns, String matchField, - boolean traceMatch, boolean ignoreMissing) { + boolean traceMatch, boolean ignoreMissing, ThreadWatchdog threadWatchdog) { super(tag); this.matchField = matchField; this.matchPatterns = matchPatterns; - this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch)); + this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), threadWatchdog); this.traceMatch = traceMatch; this.ignoreMissing = ignoreMissing; } @@ -132,9 +133,11 @@ public final class GrokProcessor extends AbstractProcessor { public static final class Factory implements Processor.Factory { private final Map builtinPatterns; + private final ThreadWatchdog threadWatchdog; - public Factory(Map builtinPatterns) { + public Factory(Map builtinPatterns, ThreadWatchdog threadWatchdog) { this.builtinPatterns = builtinPatterns; + this.threadWatchdog = threadWatchdog; } @Override @@ -155,7 +158,8 @@ public final class GrokProcessor extends AbstractProcessor { } try { - return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing); + return new GrokProcessor(processorTag, patternBank, matchPatterns, matchField, traceMatch, ignoreMissing, + threadWatchdog); } catch (Exception e) { throw newConfigurationException(TYPE, processorTag, "patterns", "Invalid regex pattern found in: " + matchPatterns + ". " + e.getMessage()); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index a29c994f10d..591060098b7 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -25,9 +25,12 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; @@ -45,6 +48,10 @@ import java.util.function.Supplier; public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin { static final Map GROK_PATTERNS = Grok.getBuiltinPatterns(); + static final Setting WATCHDOG_INTERVAL = + Setting.timeSetting("ingest.grok.watchdog.interval", TimeValue.timeValueSeconds(1), Setting.Property.NodeScope); + static final Setting WATCHDOG_MAX_EXECUTION_TIME = + Setting.timeSetting("ingest.grok.watchdog.max_execution_time", TimeValue.timeValueSeconds(1), Setting.Property.NodeScope); public IngestCommonPlugin() { } @@ -68,7 +75,7 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory()); processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory()); processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); - processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS)); + processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))); processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)); processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()); processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory()); @@ -89,5 +96,16 @@ public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPl Supplier nodesInCluster) { return Arrays.asList(new GrokProcessorGetAction.RestAction(settings, restController)); } + + @Override + public List> getSettings() { + return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME); + } + + private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) { + long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis(); + long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis(); + return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler); + } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java index 4cac94cd5b5..f35fa34eec4 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorFactoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -33,7 +34,7 @@ import static org.hamcrest.Matchers.notNullValue; public class GrokProcessorFactoryTests extends ESTestCase { public void testBuild() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -47,7 +48,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testBuildWithIgnoreMissing() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -62,7 +63,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testBuildMissingField() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("patterns", Collections.singletonList("(?\\w+)")); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -70,7 +71,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testBuildMissingPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "foo"); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, null, config)); @@ -78,7 +79,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testBuildEmptyPatternsList() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "foo"); config.put("patterns", Collections.emptyList()); @@ -87,7 +88,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testCreateWithCustomPatterns() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); @@ -100,7 +101,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testCreateWithInvalidPattern() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("[")); @@ -109,7 +110,7 @@ public class GrokProcessorFactoryTests extends ESTestCase { } public void testCreateWithInvalidPatternDefinition() throws Exception { - GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap()); + GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), ThreadWatchdog.noop()); Map config = new HashMap<>(); config.put("field", "_field"); config.put("patterns", Collections.singletonList("%{MY_PATTERN:name}!")); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java index 37c26db4b74..0eba79523ac 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/GrokProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.grok.ThreadWatchdog; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.test.ESTestCase; @@ -39,7 +40,7 @@ public class GrokProcessorTests extends ESTestCase { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("one", String.class), equalTo("1")); } @@ -49,7 +50,7 @@ public class GrokProcessorTests extends ESTestCase { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, "23"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("Provided Grok expressions do not match field value: [23]")); } @@ -60,7 +61,7 @@ public class GrokProcessorTests extends ESTestCase { doc.setFieldValue(fieldName, "23"); Exception e = expectThrows(IllegalArgumentException.class, () -> new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), Collections.singletonList("%{NOTONE:not_one}"), fieldName, - false, false)); + false, false, ThreadWatchdog.noop())); assertThat(e.getMessage(), equalTo("Unable to find pattern [NOTONE] in Grok's pattern dictionary")); } @@ -70,7 +71,7 @@ public class GrokProcessorTests extends ESTestCase { originalDoc.setFieldValue(fieldName, fieldName); IngestDocument doc = new IngestDocument(originalDoc); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.emptyMap(), - Collections.singletonList(fieldName), fieldName, false, false); + Collections.singletonList(fieldName), fieldName, false, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc, equalTo(originalDoc)); } @@ -80,7 +81,7 @@ public class GrokProcessorTests extends ESTestCase { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, null); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] is null, cannot process it.")); } @@ -91,7 +92,7 @@ public class GrokProcessorTests extends ESTestCase { originalIngestDocument.setFieldValue(fieldName, null); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -101,7 +102,7 @@ public class GrokProcessorTests extends ESTestCase { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -111,7 +112,7 @@ public class GrokProcessorTests extends ESTestCase { IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); doc.setFieldValue(fieldName, 1); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]")); } @@ -120,7 +121,7 @@ public class GrokProcessorTests extends ESTestCase { String fieldName = "foo.bar"; IngestDocument doc = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, false); + Collections.singletonList("%{ONE:one}"), fieldName, false, false, ThreadWatchdog.noop()); Exception e = expectThrows(Exception.class, () -> processor.execute(doc)); assertThat(e.getMessage(), equalTo("field [foo] not present as part of path [foo.bar]")); } @@ -130,7 +131,7 @@ public class GrokProcessorTests extends ESTestCase { IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), Collections.singletonMap("ONE", "1"), - Collections.singletonList("%{ONE:one}"), fieldName, false, true); + Collections.singletonList("%{ONE:one}"), fieldName, false, true, ThreadWatchdog.noop()); processor.execute(ingestDocument); assertIngestDocument(originalIngestDocument, ingestDocument); } @@ -144,7 +145,7 @@ public class GrokProcessorTests extends ESTestCase { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, false, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -160,7 +161,7 @@ public class GrokProcessorTests extends ESTestCase { patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false); + Arrays.asList("%{ONE:one}", "%{TWO:two}", "%{THREE:three}"), fieldName, true, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(false)); assertThat(doc.getFieldValue("two", String.class), equalTo("2")); @@ -175,7 +176,7 @@ public class GrokProcessorTests extends ESTestCase { Map patternBank = new HashMap<>(); patternBank.put("ONE", "1"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:one}"), fieldName, true, false); + Arrays.asList("%{ONE:one}"), fieldName, true, false, ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.hasField("one"), equalTo(true)); assertThat(doc.getFieldValue("_ingest._grok_match_index", String.class), equalTo("0")); @@ -205,8 +206,8 @@ public class GrokProcessorTests extends ESTestCase { patternBank.put("ONE", "1"); patternBank.put("TWO", "2"); patternBank.put("THREE", "3"); - GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Arrays.asList("%{ONE:first}-%{TWO:second}", "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean()); + GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, Arrays.asList("%{ONE:first}-%{TWO:second}", + "%{ONE:first}-%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); @@ -219,7 +220,8 @@ public class GrokProcessorTests extends ESTestCase { Map patternBank = new HashMap<>(); patternBank.put("ONETWO", "1|2"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean()); + Collections.singletonList("%{ONETWO:first}%{ONETWO:first}"), fieldName, randomBoolean(), randomBoolean(), + ThreadWatchdog.noop()); processor.execute(doc); assertThat(doc.getFieldValue("first", String.class), equalTo("1")); } @@ -232,7 +234,8 @@ public class GrokProcessorTests extends ESTestCase { patternBank.put("ONETWO", "1|2"); patternBank.put("THREE", "3"); GrokProcessor processor = new GrokProcessor(randomAlphaOfLength(10), patternBank, - Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean()); + Collections.singletonList("%{ONETWO:first}|%{THREE:second}"), fieldName, randomBoolean(), randomBoolean(), + ThreadWatchdog.noop()); processor.execute(doc); assertFalse(doc.hasField("first")); assertThat(doc.getFieldValue("second", String.class), equalTo("3")); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ad2b8643f7a..46b11f7ac14 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -24,8 +24,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.plugins.IngestPlugin; @@ -42,8 +45,10 @@ public class IngestService { public IngestService(Settings settings, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { - Processor.Parameters parameters = new Processor.Parameters(env, scriptService, - analysisRegistry, threadPool.getThreadContext()); + BiFunction> scheduler = + (delay, command) -> threadPool.schedule(TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command); + Processor.Parameters parameters = new Processor.Parameters(env, scriptService, analysisRegistry, + threadPool.getThreadContext(), threadPool::relativeTimeInMillis, scheduler); Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { Map newProcessors = ingestPlugin.getProcessors(parameters); diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 39d74fb09a9..c318d478814 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -25,6 +25,9 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.script.ScriptService; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; +import java.util.function.LongSupplier; /** * A processor implementation may modify the data belonging to a document. @@ -94,13 +97,22 @@ public interface Processor { * instances that have run prior to in ingest. */ public final ThreadContext threadContext; + + public final LongSupplier relativeTimeSupplier; + + /** + * Provides scheduler support + */ + public final BiFunction> scheduler; - public Parameters(Environment env, ScriptService scriptService, - AnalysisRegistry analysisRegistry, ThreadContext threadContext) { + public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, + LongSupplier relativeTimeSupplier, BiFunction> scheduler) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; this.analysisRegistry = analysisRegistry; + this.relativeTimeSupplier = relativeTimeSupplier; + this.scheduler = scheduler; } }