Remove aggs and retrieve_whole_source from SchedulerConfig (elastic/elasticsearch#569)

Aggs does not need to be a separate member field. There can simply
be an aggs parse field which also then stored onto the aggregations
parse field.

Finally, retrieve_whole_source is unnecessary as we move towards
a node client based data extraction.

Original commit: elastic/x-pack-elasticsearch@14024c2ee5
This commit is contained in:
Dimitris Athanasiou 2016-12-19 09:54:59 +00:00 committed by GitHub
parent 89e506f050
commit 6a14f52d2e
11 changed files with 51 additions and 252 deletions

View File

@ -190,7 +190,6 @@ public final class Messages {
public static final String SCHEDULER_CONFIG_FIELD_NOT_SUPPORTED = "scheduler.config.field.not.supported";
public static final String SCHEDULER_CONFIG_INVALID_OPTION_VALUE = "scheduler.config.invalid.option.value";
public static final String SCHEDULER_CONFIG_MULTIPLE_AGGREGATIONS = "scheduler.config.multiple.aggregations";
public static final String SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "scheduler.does.not.support.job.with.latency";
public static final String SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD =

View File

@ -24,7 +24,7 @@ public final class ScheduledJobValidator {
if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) {
throw new IllegalArgumentException(Messages.getMessage(Messages.SCHEDULER_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
}
if (schedulerConfig.getAggregationsOrAggs() != null
if (schedulerConfig.getAggregations() != null
&& !SchedulerConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.SCHEDULER_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT));

View File

@ -56,7 +56,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
public static final ParseField INDEXES = new ParseField("indexes");
public static final ParseField TYPES = new ParseField("types");
public static final ParseField QUERY = new ParseField("query");
public static final ParseField RETRIEVE_WHOLE_SOURCE = new ParseField("retrieve_whole_source");
public static final ParseField SCROLL_SIZE = new ParseField("scroll_size");
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField AGGS = new ParseField("aggs");
@ -85,7 +84,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
throw new RuntimeException(e);
}
}, AGGREGATIONS);
PARSER.declareObject(Builder::setAggs, (p, c) -> {
PARSER.declareObject(Builder::setAggregations, (p, c) -> {
try {
return p.map();
} catch (IOException e) {
@ -99,7 +98,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
throw new RuntimeException(e);
}
}, SCRIPT_FIELDS);
PARSER.declareBoolean(Builder::setRetrieveWholeSource, RETRIEVE_WHOLE_SOURCE);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
}
@ -122,14 +120,12 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
// SearchSourceBuilder field holding the entire source:
private final Map<String, Object> query;
private final Map<String, Object> aggregations;
private final Map<String, Object> aggs;
private final Map<String, Object> scriptFields;
private final Boolean retrieveWholeSource;
private final Integer scrollSize;
private SchedulerConfig(String id, String jobId, Long queryDelay, Long frequency, List<String> indexes, List<String> types,
Map<String, Object> query, Map<String, Object> aggregations, Map<String, Object> aggs,
Map<String, Object> scriptFields, Boolean retrieveWholeSource, Integer scrollSize) {
Map<String, Object> query, Map<String, Object> aggregations, Map<String, Object> scriptFields,
Integer scrollSize) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -138,9 +134,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.types = types;
this.query = query;
this.aggregations = aggregations;
this.aggs = aggs;
this.scriptFields = scriptFields;
this.retrieveWholeSource = retrieveWholeSource;
this.scrollSize = scrollSize;
}
@ -169,17 +163,11 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
} else {
this.aggregations = null;
}
if (in.readBoolean()) {
this.aggs = in.readMap();
} else {
this.aggs = null;
}
if (in.readBoolean()) {
this.scriptFields = in.readMap();
} else {
this.scriptFields = null;
}
this.retrieveWholeSource = in.readOptionalBoolean();
this.scrollSize = in.readOptionalVInt();
}
@ -232,17 +220,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
return this.query;
}
/**
* For the ELASTICSEARCH data source only, should the whole _source document
* be retrieved for analysis, or just the analysis fields?
*
* @return Should the whole of _source be retrieved? (<code>null</code> if
* not set.)
*/
public Boolean getRetrieveWholeSource() {
return this.retrieveWholeSource;
}
/**
* For the ELASTICSEARCH data source only, get the size of documents to be
* retrieved from each shard via a scroll search
@ -271,7 +248,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
* aggregations to apply to the search to be submitted to Elasticsearch to
* get the input data. This class does not attempt to interpret the
* aggregations. The map will be converted back to an arbitrary JSON object.
* Synonym for {@link #getAggs()} (like Elasticsearch).
*
* @return The aggregations, or <code>null</code> if not set.
*/
@ -279,29 +255,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
return this.aggregations;
}
/**
* For the ELASTICSEARCH data source only, optional Elasticsearch
* aggregations to apply to the search to be submitted to Elasticsearch to
* get the input data. This class does not attempt to interpret the
* aggregations. The map will be converted back to an arbitrary JSON object.
* Synonym for {@link #getAggregations()} (like Elasticsearch).
*
* @return The aggregations, or <code>null</code> if not set.
*/
public Map<String, Object> getAggs() {
return this.aggs;
}
/**
* Convenience method to get either aggregations or aggs.
*
* @return The aggregations (whether initially specified in aggregations or
* aggs), or <code>null</code> if neither are set.
*/
public Map<String, Object> getAggregationsOrAggs() {
return (this.aggregations != null) ? this.aggregations : this.aggs;
}
/**
* Build the list of fields expected in the output from aggregations
* submitted to Elasticsearch.
@ -309,7 +262,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
* @return The list of fields, or empty list if there are no aggregations.
*/
public List<String> buildAggregatedFieldList() {
Map<String, Object> aggs = getAggregationsOrAggs();
Map<String, Object> aggs = getAggregations();
if (aggs == null) {
return Collections.emptyList();
}
@ -363,19 +316,12 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
} else {
out.writeBoolean(false);
}
if (aggs != null) {
out.writeBoolean(true);
out.writeMap(aggs);
} else {
out.writeBoolean(false);
}
if (scriptFields != null) {
out.writeBoolean(true);
out.writeMap(scriptFields);
} else {
out.writeBoolean(false);
}
out.writeOptionalBoolean(retrieveWholeSource);
out.writeOptionalVInt(scrollSize);
}
@ -408,15 +354,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
if (aggregations != null) {
builder.field(AGGREGATIONS.getPreferredName(), aggregations);
}
if (aggs != null) {
builder.field(AGGS.getPreferredName(), aggs);
}
if (scriptFields != null) {
builder.field(SCRIPT_FIELDS.getPreferredName(), scriptFields);
}
if (retrieveWholeSource != null) {
builder.field(RETRIEVE_WHOLE_SOURCE.getPreferredName(), retrieveWholeSource);
}
if (scrollSize != null) {
builder.field(SCROLL_SIZE.getPreferredName(), scrollSize);
}
@ -445,16 +385,16 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indexes, that.indexes)
&& Objects.equals(this.types, that.types) && Objects.equals(this.query, that.query)
&& Objects.equals(this.retrieveWholeSource, that.retrieveWholeSource) && Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.getAggregationsOrAggs(), that.getAggregationsOrAggs())
&& Objects.equals(this.types, that.types)
&& Objects.equals(this.query, that.query)
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.getAggregations(), that.getAggregations())
&& Objects.equals(this.scriptFields, that.scriptFields);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, retrieveWholeSource, scrollSize,
getAggregationsOrAggs(), scriptFields);
return Objects.hash(id, jobId, frequency, queryDelay, indexes, types, query, scrollSize, getAggregations(), scriptFields);
}
public static class Builder {
@ -478,7 +418,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
// NORELEASE: Use SearchSourceBuilder
private Map<String, Object> query = null;
private Map<String, Object> aggregations = null;
private Map<String, Object> aggs = null;
private Map<String, Object> scriptFields = null;
private Boolean retrieveWholeSource;
private Integer scrollSize;
@ -507,9 +446,7 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.types = config.types;
this.query = config.query;
this.aggregations = config.aggregations;
this.aggs = config.aggs;
this.scriptFields = config.scriptFields;
this.retrieveWholeSource = config.retrieveWholeSource;
this.scrollSize = config.scrollSize;
}
@ -557,11 +494,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.aggregations = Objects.requireNonNull(aggregations);
}
public void setAggs(Map<String, Object> aggs) {
// NORELEASE: make use of Collections.unmodifiableMap(...)
this.aggs = Objects.requireNonNull(aggs);
}
public void setScriptFields(Map<String, Object> scriptFields) {
// NORELEASE: make use of Collections.unmodifiableMap(...)
this.scriptFields = Objects.requireNonNull(scriptFields);
@ -592,17 +524,12 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) {
throw invalidOptionValue(TYPES.getPreferredName(), types);
}
if (aggregations != null && aggs != null) {
String msg = Messages.getMessage(Messages.SCHEDULER_CONFIG_MULTIPLE_AGGREGATIONS);
throw new IllegalArgumentException(msg);
}
if (Boolean.TRUE.equals(retrieveWholeSource)) {
if (scriptFields != null) {
throw notSupportedValue(SCRIPT_FIELDS, Messages.SCHEDULER_CONFIG_FIELD_NOT_SUPPORTED);
}
}
return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, aggs, scriptFields,
retrieveWholeSource, scrollSize);
return new SchedulerConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize);
}
private static ElasticsearchException invalidOptionValue(String fieldName, Object value) {

View File

@ -70,19 +70,16 @@ public class ElasticsearchQueryBuilder {
private static final String AGGREGATION_TEMPLATE = ", \"aggs\": %s";
private static final String SCRIPT_FIELDS_TEMPLATE = ", \"script_fields\": %s";
private static final String FIELDS_TEMPLATE = "%s, \"_source\": %s";
private final String search;
private final String aggregations;
private final String scriptFields;
private final String fields;
private final String timeField;
public ElasticsearchQueryBuilder(String search, String aggs, String scriptFields, String fields, String timeField) {
public ElasticsearchQueryBuilder(String search, String aggs, String scriptFields, String timeField) {
this.search = Objects.requireNonNull(search);
aggregations = aggs;
this.scriptFields = scriptFields;
this.fields = fields;
this.timeField = Objects.requireNonNull(timeField);
}
@ -102,17 +99,13 @@ public class ElasticsearchQueryBuilder {
}
private String createResultsFormatSpec(String aggs) {
return (aggs != null) ? createAggregations(aggs) : ((fields != null) ? createFieldDataFields() : "");
return (aggs != null) ? createAggregations(aggs) : createScriptFields();
}
private String createAggregations(String aggs) {
return String.format(Locale.ROOT, AGGREGATION_TEMPLATE, aggs);
}
private String createFieldDataFields() {
return String.format(Locale.ROOT, FIELDS_TEMPLATE, createScriptFields(), fields);
}
private String createScriptFields() {
return (scriptFields != null) ? String.format(Locale.ROOT, SCRIPT_FIELDS_TEMPLATE, scriptFields) : "";
}
@ -125,14 +118,10 @@ public class ElasticsearchQueryBuilder {
public void logQueryInfo(Logger logger) {
if (aggregations != null) {
logger.debug("Will use the following Elasticsearch aggregations: " + aggregations);
} else {
if (fields != null) {
logger.debug("Will request only the following field(s) from Elasticsearch: " + String.join(" ", fields));
} else {
logger.debug("Will retrieve whole _source document from Elasticsearch");
}
}
}
public boolean isAggregated() {
return aggregations != null;

View File

@ -14,12 +14,11 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class HttpDataExtractorFactory implements DataExtractorFactory {
@ -37,9 +36,8 @@ public class HttpDataExtractorFactory implements DataExtractorFactory {
String timeField = job.getDataDescription().getTimeField();
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(
stringifyElasticsearchQuery(schedulerConfig.getQuery()),
stringifyElasticsearchAggregations(schedulerConfig.getAggregations(), schedulerConfig.getAggs()),
stringifyElasticsearchAggregations(schedulerConfig.getAggregations()),
stringifyElasticsearchScriptFields(schedulerConfig.getScriptFields()),
Boolean.TRUE.equals(schedulerConfig.getRetrieveWholeSource()) ? null : writeListAsJson(job.allFields()),
timeField);
HttpRequester httpRequester = new HttpRequester();
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder
@ -63,13 +61,10 @@ public class HttpDataExtractorFactory implements DataExtractorFactory {
return queryStr;
}
String stringifyElasticsearchAggregations(Map<String, Object> aggregationsMap, Map<String, Object> aggsMap) {
String stringifyElasticsearchAggregations(Map<String, Object> aggregationsMap) {
if (aggregationsMap != null) {
return writeMapAsJson(aggregationsMap);
}
if (aggsMap != null) {
return writeMapAsJson(aggsMap);
}
return null;
}
@ -89,20 +84,4 @@ public class HttpDataExtractorFactory implements DataExtractorFactory {
throw new ElasticsearchParseException("failed to convert map to JSON string", e);
}
}
private static String writeListAsJson(List<String> list) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.startArray("a");
for (String e : list) {
builder.value(e);
}
builder.endArray();
builder.endObject();
return builder.string().replace("{\"a\":", "").replace("}", "");
} catch (IOException e) {
throw new ElasticsearchParseException("failed to convert map to JSON string", e);
}
}
}

View File

@ -139,7 +139,6 @@ job.config.unknown.function = Unknown function ''{0}''
scheduler.config.field.not.supported = Scheduler configuration field {0} not supported
scheduler.config.invalid.option.value = Invalid {0} value ''{1}'' in scheduler configuration
scheduler.config.multiple.aggregations = Both aggregations and aggs were specified - please just specify one
scheduler.does.not.support.job.with.latency = A job configured with scheduler cannot support latency
scheduler.aggregations.requires.job.with.summary.count.field = A job configured with a scheduler with aggregations must have summary_count_field_name ''{0}''

View File

@ -120,7 +120,7 @@ public class ScheduledJobIT extends ESRestTestCase {
private Response createScheduler(String schedulerId, String jobId) throws IOException {
String schedulerConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"airline-data\"],\n"
+ "\"types\":[\"response\"],\n" + "\"retrieve_whole_source\":true\n" + "}";
+ "\"types\":[\"response\"]\n" + "}";
return client().performRequest("put", PrelertPlugin.BASE_PATH + "schedulers/" + schedulerId, Collections.emptyMap(),
new StringEntity(schedulerConfig));
}

View File

@ -166,7 +166,7 @@ public class ScheduledJobValidatorTests extends ESTestCase {
"} " +
"}";
XContentParser parser = XContentFactory.xContent(aggStr).createParser(aggStr);
schedulerConfig.setAggs(parser.map());
schedulerConfig.setAggregations(parser.map());
return schedulerConfig;
}

View File

@ -45,8 +45,6 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
}
if (randomBoolean()) {
builder.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
} else if (randomBoolean()) {
builder.setAggs(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setFrequency(randomPositiveLong());
@ -104,32 +102,39 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
assertTrue(builder.build().buildAggregatedFieldList().isEmpty());
}
/**
* Test parsing of the opaque {@link SchedulerConfig#getAggs()} object
*/
public void testAggsParse() throws IOException {
Logger logger = Loggers.getLogger(SchedulerConfigTests.class);
String configStr = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"],"
String aggregationsConfig = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"],"
+ "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }," + "\"aggs\" : {" + "\"top_level_must_be_time\" : {"
+ "\"histogram\" : {" + "\"field\" : \"@timestamp\"," + "\"interval\" : 3600000" + "}," + "\"aggs\" : {"
+ "\"by_field_in_the_middle\" : { " + "\"terms\" : {" + "\"field\" : \"airline\"," + "\"size\" : 0" + "}," + "\"aggs\" : {"
+ "\"stats_last\" : {" + "\"avg\" : {" + "\"field\" : \"responsetime\"" + "}" + "}" + "} " + "}" + "}" + "}" + "}" + "}"
+ "}";
XContentParser parser = XContentFactory.xContent(configStr).createParser(configStr);
SchedulerConfig schedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build();
assertNotNull(schedulerConfig);
String aggsConfig = "{" + "\"scheduler_id\":\"scheduler1\"," + "\"job_id\":\"job1\"," + "\"indexes\":[\"farequote\"],"
+ "\"types\":[\"farequote\"]," + "\"query\":{\"match_all\":{} }," + "\"aggs\" : {" + "\"top_level_must_be_time\" : {"
+ "\"histogram\" : {" + "\"field\" : \"@timestamp\"," + "\"interval\" : 3600000" + "}," + "\"aggs\" : {"
+ "\"by_field_in_the_middle\" : { " + "\"terms\" : {" + "\"field\" : \"airline\"," + "\"size\" : 0" + "}," + "\"aggs\" : {"
+ "\"stats_last\" : {" + "\"avg\" : {" + "\"field\" : \"responsetime\"" + "}" + "}" + "} " + "}" + "}" + "}" + "}" + "}"
+ "}";
Map<String, Object> aggs = schedulerConfig.getAggregationsOrAggs();
XContentParser parser = XContentFactory.xContent(aggregationsConfig).createParser(aggregationsConfig);
SchedulerConfig aggregationsSchedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build();
parser = XContentFactory.xContent(aggsConfig).createParser(aggsConfig);
SchedulerConfig aggsSchedulerConfig = SchedulerConfig.PARSER.apply(parser, () -> ParseFieldMatcher.STRICT).build();
assertNotNull(aggregationsSchedulerConfig);
assertNotNull(aggsSchedulerConfig);
assertEquals(aggregationsSchedulerConfig, aggsSchedulerConfig);
Map<String, Object> aggs = aggsSchedulerConfig.getAggregations();
assertNotNull(aggs);
String aggsAsJson = XContentFactory.jsonBuilder().map(aggs).string();
logger.info("Round trip of aggs is: " + aggsAsJson);
assertTrue(aggs.containsKey("top_level_must_be_time"));
List<String> aggregatedFieldList = schedulerConfig.buildAggregatedFieldList();
List<String> aggregatedFieldList = aggsSchedulerConfig.buildAggregatedFieldList();
assertEquals(3, aggregatedFieldList.size());
assertEquals("@timestamp", aggregatedFieldList.get(0));
assertEquals("airline", aggregatedFieldList.get(1));
@ -341,18 +346,6 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
assertEquals(Messages.getMessage(Messages.SCHEDULER_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage());
}
public void testCheckValid_GivenBothAggregationsAndAggsAreSet() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder("scheduler1", "job1");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
conf.setScrollSize(1000);
Map<String, Object> aggs = new HashMap<>();
conf.setAggregations(aggs);
conf.setAggs(aggs);
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.SCHEDULER_CONFIG_MULTIPLE_AGGREGATIONS), e.getMessage());
}
public static String randomValidSchedulerId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);

View File

@ -36,7 +36,6 @@ public class ElasticsearchDataExtractorTests extends ESTestCase {
private String aggregations;
private String scriptFields;
private String fields;
private ElasticsearchDataExtractor extractor;
@ -239,66 +238,6 @@ public class ElasticsearchDataExtractorTests extends ESTestCase {
expectThrows(NoSuchElementException.class, () -> extractor.next());
}
public void testDataExtractionWithFields() throws IOException {
fields = "[\"id\"]";
String initialResponse = "{" + "\"_scroll_id\":\"c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1\"," + "\"took\":17,"
+ "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "},"
+ "\"hits\":{" + " \"total\":1437," + " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\","
+ " \"_type\":\"dataType\"," + " \"_id\":\"1403481600\"," + " \"_score\":null," + " \"fields\":{"
+ " \"id\":[\"1403481600\"]" + " }" + " ]" + "}" + "}";
String scrollResponse = "{" + "\"_scroll_id\":\"secondScrollId\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403782200\"," + " \"_score\":null," + " \"fields\":{" + " \"id\":[\"1403782200\"]" + " }"
+ " ]" + "}" + "}";
String scrollEndResponse = "{" + "\"_scroll_id\":\"thirdScrollId\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437,"
+ " \"max_score\":null," + " \"hits\":[]" + "}" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 200),
new HttpResponse(toStream(scrollResponse), 200), new HttpResponse(toStream(scrollEndResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
assertEquals(initialResponse, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertEquals(scrollResponse, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
requester.assertEqualRequestsToResponses();
requester.assertResponsesHaveBeenConsumed();
RequestParams firstRequestParams = requester.getGetRequestParams(0);
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000", firstRequestParams.url);
String expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {"
+ " \"bool\": {" + " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"time\": {" + " \"gte\": \"1970-01-17T04:53:20.000Z\","
+ " \"lt\": \"1970-01-17T05:53:20.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"_source\": [\"id\"]" + "}";
assertEquals(expectedSearchBody.replaceAll(" ", ""), firstRequestParams.requestBody.replaceAll(" ", ""));
RequestParams secondRequestParams = requester.getGetRequestParams(1);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", secondRequestParams.url);
assertEquals("c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1", secondRequestParams.requestBody);
RequestParams thirdRequestParams = requester.getGetRequestParams(2);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", thirdRequestParams.url);
assertEquals("secondScrollId", thirdRequestParams.requestBody);
assertEquals("http://localhost:9200/_search/scroll", requester.getDeleteRequestParams(0).url);
assertEquals("{\"scroll_id\":[\"thirdScrollId\"]}", requester.getDeleteRequestParams(0).requestBody);
assertEquals(1, requester.deleteRequestParams.size());
}
public void testDataExtractionWithAggregations() throws IOException {
aggregations = "{\"my-aggs\": {\"terms\":{\"field\":\"foo\"}}}";
@ -687,7 +626,7 @@ public class ElasticsearchDataExtractorTests extends ESTestCase {
}
private void createExtractor(MockHttpRequester httpRequester) {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(SEARCH, aggregations, scriptFields, fields, TIME_FIELD);
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(SEARCH, aggregations, scriptFields, TIME_FIELD);
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder.create(INDEXES, TYPES);
extractor = new ElasticsearchDataExtractor(httpRequester, urlBuilder, queryBuilder, 1000);
}

View File

@ -14,7 +14,7 @@ import static org.mockito.Mockito.verify;
public class ElasticsearchQueryBuilderTests extends ESTestCase {
public void testCreateSearchBody_GivenQueryOnly() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, null, "time");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "time");
assertFalse(queryBuilder.isAggregated());
@ -28,25 +28,9 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
assertEquals(expected.replaceAll(" ", ""), searchBody.replaceAll(" ", ""));
}
public void testCreateSearchBody_GivenQueryAndFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "[\"foo\",\"bar\"]",
"@timestamp");
assertFalse(queryBuilder.isAggregated());
String searchBody = queryBuilder.createSearchBody(1451606400000L, 1451610000000L);
String expected = "{" + " \"sort\": [" + " {\"@timestamp\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {"
+ " \"bool\": {" + " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"@timestamp\": {" + " \"gte\": \"2016-01-01T00:00:00.000Z\","
+ " \"lt\": \"2016-01-01T01:00:00.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"_source\": [\"foo\",\"bar\"]" + "}";
assertEquals(expected.replaceAll(" ", ""), searchBody.replaceAll(" ", ""));
}
public void testCreateSearchBody_GivenQueryAndFieldsAndScriptFields() {
public void testCreateSearchBody_GivenQueryAndScriptFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null,
"{\"test1\":{\"script\": \"...\"}}", "[\"foo\",\"bar\"]", "@timestamp");
"{\"test1\":{\"script\": \"...\"}}", "@timestamp");
assertFalse(queryBuilder.isAggregated());
@ -56,13 +40,13 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
+ " \"bool\": {" + " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"@timestamp\": {" + " \"gte\": \"2016-01-01T00:00:00.000Z\","
+ " \"lt\": \"2016-01-01T01:00:00.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"script_fields\": {\"test1\":{\"script\":\"...\"}},"
+ " \"_source\": [\"foo\",\"bar\"]" + "}";
+ " }" + " }" + " ]" + " }" + " }," + " \"script_fields\": {\"test1\":{\"script\":\"...\"}}"
+ "}";
assertEquals(expected.replaceAll(" ", ""), searchBody.replaceAll(" ", ""));
}
public void testCreateSearchBody_GivenQueryAndAggs() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, null, "time");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, "time");
assertTrue(queryBuilder.isAggregated());
@ -77,7 +61,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testCreateDataSummaryQuery_GivenQueryOnly() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, null, "@timestamp");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "@timestamp");
assertFalse(queryBuilder.isAggregated());
@ -93,9 +77,9 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
assertEquals(expected.replaceAll(" ", ""), dataSummaryQuery.replaceAll(" ", ""));
}
public void testCreateDataSummaryQuery_GivenQueryAndFieldsAndScriptFields() {
public void testCreateDataSummaryQuery_GivenQueryAndScriptFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null,
"{\"test1\":{\"script\": \"...\"}}", "[\"foo\",\"bar\"]", "@timestamp");
"{\"test1\":{\"script\": \"...\"}}", "@timestamp");
assertFalse(queryBuilder.isAggregated());
@ -112,8 +96,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testCreateDataSummaryQuery_GivenQueryAndAggs() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, null,
"@timestamp");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{}}", null, "@timestamp");
assertTrue(queryBuilder.isAggregated());
@ -130,7 +113,7 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
}
public void testLogQueryInfo_GivenNoAggsNoFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, null, "@timestamp");
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "@timestamp");
Logger logger = mock(Logger.class);
queryBuilder.logQueryInfo(logger);
@ -138,18 +121,9 @@ public class ElasticsearchQueryBuilderTests extends ESTestCase {
verify(logger).debug("Will retrieve whole _source document from Elasticsearch");
}
public void testLogQueryInfo_GivenFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", null, null, "[\"foo\"]", "@timestamp");
Logger logger = mock(Logger.class);
queryBuilder.logQueryInfo(logger);
verify(logger).debug("Will request only the following field(s) from Elasticsearch: [\"foo\"]");
}
public void testLogQueryInfo_GivenAggs() {
public void testLogQueryInfo() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder("\"match_all\":{}", "{\"my_aggs\":{ \"foo\": \"bar\" }}",
null, null, "@timestamp");
null, "@timestamp");
Logger logger = mock(Logger.class);
queryBuilder.logQueryInfo(logger);