From 72ef1acdc0a1c7200cbc6bf06492e3f31e0564a5 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 14 Feb 2017 12:26:21 -0500 Subject: [PATCH] Move domainSplit injection to ScrollDataExtractor (elastic/x-pack-elasticsearch#554) By injecting immediately before use, we don't have to store it in the config and return huge blobs to the user when they request the config Original commit: elastic/x-pack-elasticsearch@2e53c930eace635785813ac05573b262bfc2a071 --- .../xpack/ml/datafeed/DatafeedConfig.java | 19 +----- .../extractor/scroll/ScrollDataExtractor.java | 24 ++++++- .../ml/datafeed/DatafeedConfigTests.java | 25 -------- .../scroll/ScrollDataExtractorTests.java | 63 ++++++++++++++++++- 4 files changed, 85 insertions(+), 46 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java index 055c42fdeab..cba3388ee0b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java @@ -414,24 +414,7 @@ public class DatafeedConfig extends AbstractDiffable implements public void setScriptFields(List scriptFields) { List sorted = new ArrayList<>(); for (SearchSourceBuilder.ScriptField scriptField : scriptFields) { - String script = scriptField.script().getIdOrCode(); - - if (script.contains("domainSplit(")) { - String modifiedCode = DomainSplitFunction.function + "\n" + script; - Map modifiedParams = new HashMap<>(scriptField.script().getParams().size() - + DomainSplitFunction.params.size()); - - modifiedParams.putAll(scriptField.script().getParams()); - modifiedParams.putAll(DomainSplitFunction.params); - - Script newScript = new Script(scriptField.script().getType(), scriptField.script().getLang(), - modifiedCode, modifiedParams); - - sorted.add(new SearchSourceBuilder.ScriptField(scriptField.fieldName(), newScript, scriptField.ignoreFailure())); - } else { - sorted.add(scriptField); - } - + sorted.add(scriptField); } sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName)); this.scriptFields = sorted; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index a7df09b0dcc..78dde7d5c80 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -14,16 +14,21 @@ import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.StoredFieldsContext; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.ml.utils.DomainSplitFunction; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; @@ -111,10 +116,27 @@ class ScrollDataExtractor implements DataExtractor { } else { searchRequestBuilder.setFetchSource(sourceFields, null); } - context.scriptFields.forEach(f -> searchRequestBuilder.addScriptField(f.fieldName(), f.script())); + context.scriptFields.forEach(f -> searchRequestBuilder.addScriptField( + f.fieldName(), injectDomainSplit(f.script()))); return searchRequestBuilder; } + private Script injectDomainSplit(Script script) { + String code = script.getIdOrCode(); + if (code.contains("domainSplit(")) { + String modifiedCode = DomainSplitFunction.function + "\n" + script; + Map modifiedParams = new HashMap<>(script.getParams().size() + + DomainSplitFunction.params.size()); + + modifiedParams.putAll(script.getParams()); + modifiedParams.putAll(DomainSplitFunction.params); + + return new Script(script.getType(), script.getLang(), + modifiedCode, modifiedParams); + } + return script; + } + private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException { ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse); scrollId = searchResponse.getScrollId(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java index b8d8c8900e7..ccaacb1f2e0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -290,30 +289,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase scriptFields = config.getScriptFields(); - - assertThat(scriptFields.size(), equalTo(2)); - assertThat(scriptFields.get(0).fieldName(), equalTo("script1")); - assertThat(scriptFields.get(0).script().getIdOrCode(), equalTo("return 1+1;")); - assertFalse(scriptFields.get(0).script().getParams().containsKey("exact")); - - assertThat(scriptFields.get(1).fieldName(), equalTo("script2")); - assertThat(scriptFields.get(1).script().getIdOrCode(), containsString("List domainSplit(String host, Map params)")); - assertTrue(scriptFields.get(1).script().getParams().containsKey("exact")); - } - public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java index eb1ebd1489d..b065955bc75 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractorTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.script.Script; import org.junit.Before; import java.io.BufferedReader; @@ -58,7 +59,11 @@ public class ScrollDataExtractorTests extends ESTestCase { private SearchResponse nextResponse; TestDataExtractor(long start, long end) { - super(client, createContext(start, end)); + this(createContext(start, end)); + } + + TestDataExtractor(ScrollDataExtractorContext context) { + super(client, context); } @Override @@ -270,6 +275,60 @@ public class ScrollDataExtractorTests extends ESTestCase { assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards")); } + public void testDomainSplitScriptField() throws IOException { + + SearchSourceBuilder.ScriptField withoutSplit = new SearchSourceBuilder.ScriptField( + "script1", new Script("return 1+1;"), false); + SearchSourceBuilder.ScriptField withSplit = new SearchSourceBuilder.ScriptField( + "script2", new Script("return domainSplit('foo.com', params);"), false); + + List sFields = Arrays.asList(withoutSplit, withSplit); + ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indexes, + types, query, sFields, scrollSize, 1000, 2000); + + TestDataExtractor extractor = new TestDataExtractor(context); + + SearchResponse response1 = createSearchResponse( + Arrays.asList(1100L, 1200L), + Arrays.asList("a1", "a2"), + Arrays.asList("b1", "b2") + ); + extractor.setNextResponse(response1); + + assertThat(extractor.hasNext(), is(true)); + Optional stream = extractor.next(); + assertThat(stream.isPresent(), is(true)); + String expectedStream = "{\"time\":1100,\"field_1\":\"a1\"} {\"time\":1200,\"field_1\":\"a2\"}"; + assertThat(asString(stream.get()), equalTo(expectedStream)); + + SearchResponse response2 = createEmptySearchResponse(); + extractor.setNextResponse(response2); + assertThat(extractor.hasNext(), is(true)); + assertThat(extractor.next().isPresent(), is(false)); + assertThat(extractor.hasNext(), is(false)); + assertThat(capturedSearchRequests.size(), equalTo(1)); + + String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", ""); + assertThat(searchRequest, containsString("\"size\":1000")); + assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," + + "{\"range\":{\"time\":{\"from\":1000,\"to\":2000,\"include_lower\":true,\"include_upper\":false," + + "\"format\":\"epoch_millis\",\"boost\":1.0}}}]")); + assertThat(searchRequest, containsString("\"sort\":[{\"time\":{\"order\":\"asc\"}}]")); + assertThat(searchRequest, containsString("\"stored_fields\":\"_none_\"")); + + // Check for the scripts + assertThat(searchRequest, containsString("{\"script\":{\"inline\":\"return 1 + 1;\",\"lang\":\"painless\"}" + .replaceAll("\\s", ""))); + assertThat(searchRequest, containsString("List domainSplit(String host, Map params)".replaceAll("\\s", ""))); + assertThat(searchRequest, containsString("String replaceDots(String input) {".replaceAll("\\s", ""))); + + assertThat(capturedContinueScrollIds.size(), equalTo(1)); + assertThat(capturedContinueScrollIds.get(0), equalTo(response1.getScrollId())); + + assertThat(capturedClearScrollIds.size(), equalTo(1)); + assertThat(capturedClearScrollIds.get(0), equalTo(response2.getScrollId())); + } + private ScrollDataExtractorContext createContext(long start, long end) { return new ScrollDataExtractorContext(jobId, extractedFields, indexes, types, query, scriptFields, scrollSize, start, end); } @@ -324,4 +383,4 @@ public class ScrollDataExtractorTests extends ESTestCase { return reader.lines().collect(Collectors.joining("\n")); } } -} +} \ No newline at end of file