Move domainSplit injection to ScrollDataExtractor (elastic/x-pack-elasticsearch#554)
By injecting immediately before use, we don't have to store it in the config and return huge blobs to the user when they request the config Original commit: elastic/x-pack-elasticsearch@2e53c930ea
This commit is contained in:
parent
ac8f3ffec4
commit
72ef1acdc0
|
@ -414,24 +414,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
|
||||||
public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
|
public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
|
||||||
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>();
|
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>();
|
||||||
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
|
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
|
||||||
String script = scriptField.script().getIdOrCode();
|
sorted.add(scriptField);
|
||||||
|
|
||||||
if (script.contains("domainSplit(")) {
|
|
||||||
String modifiedCode = DomainSplitFunction.function + "\n" + script;
|
|
||||||
Map<String, Object> modifiedParams = new HashMap<>(scriptField.script().getParams().size()
|
|
||||||
+ DomainSplitFunction.params.size());
|
|
||||||
|
|
||||||
modifiedParams.putAll(scriptField.script().getParams());
|
|
||||||
modifiedParams.putAll(DomainSplitFunction.params);
|
|
||||||
|
|
||||||
Script newScript = new Script(scriptField.script().getType(), scriptField.script().getLang(),
|
|
||||||
modifiedCode, modifiedParams);
|
|
||||||
|
|
||||||
sorted.add(new SearchSourceBuilder.ScriptField(scriptField.fieldName(), newScript, scriptField.ignoreFailure()));
|
|
||||||
} else {
|
|
||||||
sorted.add(scriptField);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
|
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
|
||||||
this.scriptFields = sorted;
|
this.scriptFields = sorted;
|
||||||
|
|
|
@ -14,16 +14,21 @@ import org.elasticsearch.action.search.SearchScrollAction;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.script.Script;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
import org.elasticsearch.search.fetch.StoredFieldsContext;
|
import org.elasticsearch.search.fetch.StoredFieldsContext;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
|
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
|
||||||
|
import org.elasticsearch.xpack.ml.utils.DomainSplitFunction;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -111,10 +116,27 @@ class ScrollDataExtractor implements DataExtractor {
|
||||||
} else {
|
} else {
|
||||||
searchRequestBuilder.setFetchSource(sourceFields, null);
|
searchRequestBuilder.setFetchSource(sourceFields, null);
|
||||||
}
|
}
|
||||||
context.scriptFields.forEach(f -> searchRequestBuilder.addScriptField(f.fieldName(), f.script()));
|
context.scriptFields.forEach(f -> searchRequestBuilder.addScriptField(
|
||||||
|
f.fieldName(), injectDomainSplit(f.script())));
|
||||||
return searchRequestBuilder;
|
return searchRequestBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Script injectDomainSplit(Script script) {
|
||||||
|
String code = script.getIdOrCode();
|
||||||
|
if (code.contains("domainSplit(")) {
|
||||||
|
String modifiedCode = DomainSplitFunction.function + "\n" + script;
|
||||||
|
Map<String, Object> modifiedParams = new HashMap<>(script.getParams().size()
|
||||||
|
+ DomainSplitFunction.params.size());
|
||||||
|
|
||||||
|
modifiedParams.putAll(script.getParams());
|
||||||
|
modifiedParams.putAll(DomainSplitFunction.params);
|
||||||
|
|
||||||
|
return new Script(script.getType(), script.getLang(),
|
||||||
|
modifiedCode, modifiedParams);
|
||||||
|
}
|
||||||
|
return script;
|
||||||
|
}
|
||||||
|
|
||||||
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
|
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
|
||||||
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
|
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
|
||||||
scrollId = searchResponse.getScrollId();
|
scrollId = searchResponse.getScrollId();
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.containsString;
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
@ -290,30 +289,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
|
||||||
assertThat(datafeedConfig.hasAggregations(), is(true));
|
assertThat(datafeedConfig.hasAggregations(), is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDomainSplitInjection() {
|
|
||||||
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("datafeed1", "job1");
|
|
||||||
datafeed.setIndexes(Arrays.asList("my_index"));
|
|
||||||
datafeed.setTypes(Arrays.asList("my_type"));
|
|
||||||
|
|
||||||
SearchSourceBuilder.ScriptField withoutSplit = new SearchSourceBuilder.ScriptField(
|
|
||||||
"script1", new Script("return 1+1;"), false);
|
|
||||||
SearchSourceBuilder.ScriptField withSplit = new SearchSourceBuilder.ScriptField(
|
|
||||||
"script2", new Script("return domainSplit('foo.com', params);"), false);
|
|
||||||
datafeed.setScriptFields(Arrays.asList(withoutSplit, withSplit));
|
|
||||||
|
|
||||||
DatafeedConfig config = datafeed.build();
|
|
||||||
List<SearchSourceBuilder.ScriptField> scriptFields = config.getScriptFields();
|
|
||||||
|
|
||||||
assertThat(scriptFields.size(), equalTo(2));
|
|
||||||
assertThat(scriptFields.get(0).fieldName(), equalTo("script1"));
|
|
||||||
assertThat(scriptFields.get(0).script().getIdOrCode(), equalTo("return 1+1;"));
|
|
||||||
assertFalse(scriptFields.get(0).script().getParams().containsKey("exact"));
|
|
||||||
|
|
||||||
assertThat(scriptFields.get(1).fieldName(), equalTo("script2"));
|
|
||||||
assertThat(scriptFields.get(1).script().getIdOrCode(), containsString("List domainSplit(String host, Map params)"));
|
|
||||||
assertTrue(scriptFields.get(1).script().getParams().containsKey("exact"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String randomValidDatafeedId() {
|
public static String randomValidDatafeedId() {
|
||||||
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
|
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
|
||||||
return generator.ofCodePointsLength(random(), 10, 10);
|
return generator.ofCodePointsLength(random(), 10, 10);
|
||||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.search.SearchHitField;
|
||||||
import org.elasticsearch.search.SearchHits;
|
import org.elasticsearch.search.SearchHits;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.script.Script;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
|
@ -58,7 +59,11 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
private SearchResponse nextResponse;
|
private SearchResponse nextResponse;
|
||||||
|
|
||||||
TestDataExtractor(long start, long end) {
|
TestDataExtractor(long start, long end) {
|
||||||
super(client, createContext(start, end));
|
this(createContext(start, end));
|
||||||
|
}
|
||||||
|
|
||||||
|
TestDataExtractor(ScrollDataExtractorContext context) {
|
||||||
|
super(client, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -270,6 +275,60 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards"));
|
assertThat(e.getMessage(), equalTo("[" + jobId + "] Search request encountered [1] unavailable shards"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDomainSplitScriptField() throws IOException {
|
||||||
|
|
||||||
|
SearchSourceBuilder.ScriptField withoutSplit = new SearchSourceBuilder.ScriptField(
|
||||||
|
"script1", new Script("return 1+1;"), false);
|
||||||
|
SearchSourceBuilder.ScriptField withSplit = new SearchSourceBuilder.ScriptField(
|
||||||
|
"script2", new Script("return domainSplit('foo.com', params);"), false);
|
||||||
|
|
||||||
|
List<SearchSourceBuilder.ScriptField> sFields = Arrays.asList(withoutSplit, withSplit);
|
||||||
|
ScrollDataExtractorContext context = new ScrollDataExtractorContext(jobId, extractedFields, indexes,
|
||||||
|
types, query, sFields, scrollSize, 1000, 2000);
|
||||||
|
|
||||||
|
TestDataExtractor extractor = new TestDataExtractor(context);
|
||||||
|
|
||||||
|
SearchResponse response1 = createSearchResponse(
|
||||||
|
Arrays.asList(1100L, 1200L),
|
||||||
|
Arrays.asList("a1", "a2"),
|
||||||
|
Arrays.asList("b1", "b2")
|
||||||
|
);
|
||||||
|
extractor.setNextResponse(response1);
|
||||||
|
|
||||||
|
assertThat(extractor.hasNext(), is(true));
|
||||||
|
Optional<InputStream> stream = extractor.next();
|
||||||
|
assertThat(stream.isPresent(), is(true));
|
||||||
|
String expectedStream = "{\"time\":1100,\"field_1\":\"a1\"} {\"time\":1200,\"field_1\":\"a2\"}";
|
||||||
|
assertThat(asString(stream.get()), equalTo(expectedStream));
|
||||||
|
|
||||||
|
SearchResponse response2 = createEmptySearchResponse();
|
||||||
|
extractor.setNextResponse(response2);
|
||||||
|
assertThat(extractor.hasNext(), is(true));
|
||||||
|
assertThat(extractor.next().isPresent(), is(false));
|
||||||
|
assertThat(extractor.hasNext(), is(false));
|
||||||
|
assertThat(capturedSearchRequests.size(), equalTo(1));
|
||||||
|
|
||||||
|
String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
|
||||||
|
assertThat(searchRequest, containsString("\"size\":1000"));
|
||||||
|
assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
|
||||||
|
"{\"range\":{\"time\":{\"from\":1000,\"to\":2000,\"include_lower\":true,\"include_upper\":false," +
|
||||||
|
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
|
||||||
|
assertThat(searchRequest, containsString("\"sort\":[{\"time\":{\"order\":\"asc\"}}]"));
|
||||||
|
assertThat(searchRequest, containsString("\"stored_fields\":\"_none_\""));
|
||||||
|
|
||||||
|
// Check for the scripts
|
||||||
|
assertThat(searchRequest, containsString("{\"script\":{\"inline\":\"return 1 + 1;\",\"lang\":\"painless\"}"
|
||||||
|
.replaceAll("\\s", "")));
|
||||||
|
assertThat(searchRequest, containsString("List domainSplit(String host, Map params)".replaceAll("\\s", "")));
|
||||||
|
assertThat(searchRequest, containsString("String replaceDots(String input) {".replaceAll("\\s", "")));
|
||||||
|
|
||||||
|
assertThat(capturedContinueScrollIds.size(), equalTo(1));
|
||||||
|
assertThat(capturedContinueScrollIds.get(0), equalTo(response1.getScrollId()));
|
||||||
|
|
||||||
|
assertThat(capturedClearScrollIds.size(), equalTo(1));
|
||||||
|
assertThat(capturedClearScrollIds.get(0), equalTo(response2.getScrollId()));
|
||||||
|
}
|
||||||
|
|
||||||
private ScrollDataExtractorContext createContext(long start, long end) {
|
private ScrollDataExtractorContext createContext(long start, long end) {
|
||||||
return new ScrollDataExtractorContext(jobId, extractedFields, indexes, types, query, scriptFields, scrollSize, start, end);
|
return new ScrollDataExtractorContext(jobId, extractedFields, indexes, types, query, scriptFields, scrollSize, start, end);
|
||||||
}
|
}
|
||||||
|
@ -324,4 +383,4 @@ public class ScrollDataExtractorTests extends ESTestCase {
|
||||||
return reader.lines().collect(Collectors.joining("\n"));
|
return reader.lines().collect(Collectors.joining("\n"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue