Replace http data extractor with a client extractor (elastic/elasticsearch#619)

* Replace http data extractor with a client extractor

This first implementation replaces the HTTP extractor
with a client extractor that uses search & scroll.

Note that this first implementation has some limitations:

- Only reads data that are in the _source
- Does not handle aggregated searches

These limitations will be addressed in follow up PRs.

Relates to elastic/elasticsearch#154

Original commit: elastic/x-pack-elasticsearch@f692ed961c
This commit is contained in:
Dimitris Athanasiou 2017-01-10 11:45:17 +00:00 committed by GitHub
parent dee7412044
commit 9e5245fd64
31 changed files with 916 additions and 2030 deletions

View File

@ -47,16 +47,16 @@ import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.prelert.action.GetRecordsAction; import org.elasticsearch.xpack.prelert.action.GetRecordsAction;
import org.elasticsearch.xpack.prelert.action.GetSchedulersAction; import org.elasticsearch.xpack.prelert.action.GetSchedulersAction;
import org.elasticsearch.xpack.prelert.action.GetSchedulersStatsAction; import org.elasticsearch.xpack.prelert.action.GetSchedulersStatsAction;
import org.elasticsearch.xpack.prelert.action.PostDataAction;
import org.elasticsearch.xpack.prelert.action.OpenJobAction; import org.elasticsearch.xpack.prelert.action.OpenJobAction;
import org.elasticsearch.xpack.prelert.action.PostDataAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction; import org.elasticsearch.xpack.prelert.action.PutJobAction;
import org.elasticsearch.xpack.prelert.action.PutListAction; import org.elasticsearch.xpack.prelert.action.PutListAction;
import org.elasticsearch.xpack.prelert.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.PutSchedulerAction; import org.elasticsearch.xpack.prelert.action.PutSchedulerAction;
import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.StartSchedulerAction; import org.elasticsearch.xpack.prelert.action.StartSchedulerAction;
import org.elasticsearch.xpack.prelert.action.StopSchedulerAction; import org.elasticsearch.xpack.prelert.action.StopSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction; import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.UpdateSchedulerStatusAction; import org.elasticsearch.xpack.prelert.action.UpdateSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.action.ValidateDetectorAction; import org.elasticsearch.xpack.prelert.action.ValidateDetectorAction;
import org.elasticsearch.xpack.prelert.action.ValidateTransformAction; import org.elasticsearch.xpack.prelert.action.ValidateTransformAction;
@ -90,16 +90,16 @@ import org.elasticsearch.xpack.prelert.rest.job.RestDeleteJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestFlushJobAction; import org.elasticsearch.xpack.prelert.rest.job.RestFlushJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsAction; import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsStatsAction; import org.elasticsearch.xpack.prelert.rest.job.RestGetJobsStatsAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPostDataAction;
import org.elasticsearch.xpack.prelert.rest.job.RestOpenJobAction; import org.elasticsearch.xpack.prelert.rest.job.RestOpenJobAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPostDataAction;
import org.elasticsearch.xpack.prelert.rest.job.RestPutJobAction; import org.elasticsearch.xpack.prelert.rest.job.RestPutJobAction;
import org.elasticsearch.xpack.prelert.rest.list.RestDeleteListAction; import org.elasticsearch.xpack.prelert.rest.list.RestDeleteListAction;
import org.elasticsearch.xpack.prelert.rest.list.RestGetListAction; import org.elasticsearch.xpack.prelert.rest.list.RestGetListAction;
import org.elasticsearch.xpack.prelert.rest.list.RestPutListAction; import org.elasticsearch.xpack.prelert.rest.list.RestPutListAction;
import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestDeleteModelSnapshotAction; import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestDeleteModelSnapshotAction;
import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestGetModelSnapshotsAction; import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestGetModelSnapshotsAction;
import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestUpdateModelSnapshotAction;
import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestRevertModelSnapshotAction; import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestRevertModelSnapshotAction;
import org.elasticsearch.xpack.prelert.rest.modelsnapshots.RestUpdateModelSnapshotAction;
import org.elasticsearch.xpack.prelert.rest.results.RestGetBucketsAction; import org.elasticsearch.xpack.prelert.rest.results.RestGetBucketsAction;
import org.elasticsearch.xpack.prelert.rest.results.RestGetCategoriesAction; import org.elasticsearch.xpack.prelert.rest.results.RestGetCategoriesAction;
import org.elasticsearch.xpack.prelert.rest.results.RestGetInfluencersAction; import org.elasticsearch.xpack.prelert.rest.results.RestGetInfluencersAction;
@ -114,7 +114,6 @@ import org.elasticsearch.xpack.prelert.rest.validate.RestValidateDetectorAction;
import org.elasticsearch.xpack.prelert.rest.validate.RestValidateTransformAction; import org.elasticsearch.xpack.prelert.rest.validate.RestValidateTransformAction;
import org.elasticsearch.xpack.prelert.rest.validate.RestValidateTransformsAction; import org.elasticsearch.xpack.prelert.rest.validate.RestValidateTransformsAction;
import org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunner; import org.elasticsearch.xpack.prelert.scheduler.ScheduledJobRunner;
import org.elasticsearch.xpack.prelert.scheduler.http.HttpDataExtractorFactory;
import org.elasticsearch.xpack.prelert.utils.NamedPipeHelper; import org.elasticsearch.xpack.prelert.utils.NamedPipeHelper;
import java.io.IOException; import java.io.IOException;
@ -209,8 +208,6 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, autodetectResultsParser, jobResultsPersister, jobRenormalizedResultsPersister, jobDataCountsPersister, autodetectResultsParser,
autodetectProcessFactory, normalizerFactory); autodetectProcessFactory, normalizerFactory);
ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, ScheduledJobRunner scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider,
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client, searchRequestParsers),
System::currentTimeMillis); System::currentTimeMillis);
TaskManager taskManager = new TaskManager(settings); TaskManager taskManager = new TaskManager(settings);

View File

@ -43,6 +43,7 @@ public class DataDescription extends ToXContentToBytes implements Writeable {
JSON("json"), JSON("json"),
DELIMITED("delimited"), DELIMITED("delimited"),
SINGLE_LINE("single_line"), SINGLE_LINE("single_line"),
// TODO norelease, this can now be removed
ELASTICSEARCH("elasticsearch"); ELASTICSEARCH("elasticsearch");
/** /**

View File

@ -22,7 +22,6 @@ abstract class AbstractJsonRecordReader implements JsonRecordReader {
// NORELEASE - Remove direct dependency on Jackson // NORELEASE - Remove direct dependency on Jackson
protected final JsonParser parser; protected final JsonParser parser;
protected final Map<String, Integer> fieldMap; protected final Map<String, Integer> fieldMap;
protected final String recordHoldingField;
protected final Logger logger; protected final Logger logger;
protected int nestedLevel; protected int nestedLevel;
protected long fieldCount; protected long fieldCount;
@ -35,36 +34,15 @@ abstract class AbstractJsonRecordReader implements JsonRecordReader {
* The JSON parser * The JSON parser
* @param fieldMap * @param fieldMap
* Map to field name to record array index position * Map to field name to record array index position
* @param recordHoldingField
* record holding field
* @param logger * @param logger
* the logger * the logger
*/ */
AbstractJsonRecordReader(JsonParser parser, Map<String, Integer> fieldMap, String recordHoldingField, Logger logger) { AbstractJsonRecordReader(JsonParser parser, Map<String, Integer> fieldMap, Logger logger) {
this.parser = Objects.requireNonNull(parser); this.parser = Objects.requireNonNull(parser);
this.fieldMap = Objects.requireNonNull(fieldMap); this.fieldMap = Objects.requireNonNull(fieldMap);
this.recordHoldingField = Objects.requireNonNull(recordHoldingField);
this.logger = Objects.requireNonNull(logger); this.logger = Objects.requireNonNull(logger);
} }
protected void consumeToField(String field) throws IOException {
if (field == null || field.isEmpty()) {
return;
}
JsonToken token = null;
while ((token = tryNextTokenOrReadToEndOnError()) != null) {
if (token == JsonToken.FIELD_NAME
&& parser.getCurrentName().equals(field)) {
tryNextTokenOrReadToEndOnError();
return;
}
}
}
protected void consumeToRecordHoldingField() throws IOException {
consumeToField(recordHoldingField);
}
protected void initArrays(String[] record, boolean[] gotFields) { protected void initArrays(String[] record, boolean[] gotFields) {
Arrays.fill(gotFields, false); Arrays.fill(gotFields, false);
Arrays.fill(record, ""); Arrays.fill(record, "");

View File

@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.prelert.job.AnalysisConfig; import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.DataDescription.DataFormat;
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.prelert.job.status.StatusReporter; import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import org.elasticsearch.xpack.prelert.job.transform.TransformConfigs; import org.elasticsearch.xpack.prelert.job.transform.TransformConfigs;
@ -32,7 +31,6 @@ import java.util.function.Supplier;
* detailed description. * detailed description.
*/ */
class JsonDataToProcessWriter extends AbstractDataToProcessWriter { class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
private static final String ELASTICSEARCH_SOURCE_FIELD = "_source";
public JsonDataToProcessWriter(boolean includeControlField, AutodetectProcess autodetectProcess, DataDescription dataDescription, public JsonDataToProcessWriter(boolean includeControlField, AutodetectProcess autodetectProcess, DataDescription dataDescription,
AnalysisConfig analysisConfig, TransformConfigs transforms, StatusReporter statusReporter, AnalysisConfig analysisConfig, TransformConfigs transforms, StatusReporter statusReporter,
@ -73,7 +71,7 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
// We never expect to get the control field // We never expect to get the control field
boolean[] gotFields = new boolean[analysisFields.size()]; boolean[] gotFields = new boolean[analysisFields.size()];
JsonRecordReader recordReader = new SimpleJsonRecordReader(parser, inFieldIndexes, getRecordHoldingField(), logger); JsonRecordReader recordReader = new SimpleJsonRecordReader(parser, inFieldIndexes, logger);
long inputFieldCount = recordReader.read(input, gotFields); long inputFieldCount = recordReader.read(input, gotFields);
while (inputFieldCount >= 0) { while (inputFieldCount >= 0) {
Arrays.fill(record, ""); Arrays.fill(record, "");
@ -96,13 +94,6 @@ class JsonDataToProcessWriter extends AbstractDataToProcessWriter {
} }
} }
private String getRecordHoldingField() {
if (dataDescription.getFormat().equals(DataFormat.ELASTICSEARCH)) {
return ELASTICSEARCH_SOURCE_FIELD;
}
return "";
}
/** /**
* Don't enforce the check that all the fields are present in JSON docs. * Don't enforce the check that all the fields are present in JSON docs.
* Always returns true * Always returns true

View File

@ -26,13 +26,11 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
* The JSON parser * The JSON parser
* @param fieldMap * @param fieldMap
* Map to field name to record array index position * Map to field name to record array index position
* @param recordHoldingField
* record field
* @param logger * @param logger
* logger * logger
*/ */
SimpleJsonRecordReader(JsonParser parser, Map<String, Integer> fieldMap, String recordHoldingField, Logger logger) { SimpleJsonRecordReader(JsonParser parser, Map<String, Integer> fieldMap, Logger logger) {
super(parser, fieldMap, recordHoldingField, logger); super(parser, fieldMap, logger);
} }
/** /**
@ -57,7 +55,6 @@ class SimpleJsonRecordReader extends AbstractJsonRecordReader {
initArrays(record, gotFields); initArrays(record, gotFields);
fieldCount = 0; fieldCount = 0;
clearNestedLevel(); clearNestedLevel();
consumeToRecordHoldingField();
JsonToken token = tryNextTokenOrReadToEndOnError(); JsonToken token = tryNextTokenOrReadToEndOnError();
while (!(token == JsonToken.END_OBJECT && nestedLevel == 0)) { while (!(token == JsonToken.END_OBJECT && nestedLevel == 0)) {

View File

@ -15,8 +15,9 @@ import org.elasticsearch.xpack.prelert.action.FlushJobAction;
import org.elasticsearch.xpack.prelert.action.PostDataAction; import org.elasticsearch.xpack.prelert.action.PostDataAction;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractor;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractorFactory;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -26,29 +27,29 @@ import java.util.function.Supplier;
class ScheduledJob { class ScheduledJob {
private static final Logger LOGGER = Loggers.getLogger(ScheduledJob.class);
private static final int NEXT_TASK_DELAY_MS = 100; private static final int NEXT_TASK_DELAY_MS = 100;
private final Logger logger;
private final Auditor auditor; private final Auditor auditor;
private final String jobId; private final String jobId;
private final long frequencyMs; private final long frequencyMs;
private final long queryDelayMs; private final long queryDelayMs;
private final Client client; private final Client client;
private final DataExtractor dataExtractor; private final DataExtractorFactory dataExtractorFactory;
private final Supplier<Long> currentTimeSupplier; private final Supplier<Long> currentTimeSupplier;
private volatile DataExtractor dataExtractor;
private volatile long lookbackStartTimeMs; private volatile long lookbackStartTimeMs;
private volatile Long lastEndTimeMs; private volatile Long lastEndTimeMs;
private volatile boolean running = true; private volatile boolean running = true;
ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractor dataExtractor, ScheduledJob(String jobId, long frequencyMs, long queryDelayMs, DataExtractorFactory dataExtractorFactory,
Client client, Auditor auditor, Supplier<Long> currentTimeSupplier, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
this.logger = Loggers.getLogger(jobId);
this.jobId = jobId; this.jobId = jobId;
this.frequencyMs = frequencyMs; this.frequencyMs = frequencyMs;
this.queryDelayMs = queryDelayMs; this.queryDelayMs = queryDelayMs;
this.dataExtractor = dataExtractor; this.dataExtractorFactory = dataExtractorFactory;
this.client = client; this.client = client;
this.auditor = auditor; this.auditor = auditor;
this.currentTimeSupplier = currentTimeSupplier; this.currentTimeSupplier = currentTimeSupplier;
@ -82,7 +83,7 @@ class ScheduledJob {
request.setCalcInterim(true); request.setCalcInterim(true);
run(lookbackStartTimeMs, lookbackEnd, request); run(lookbackStartTimeMs, lookbackEnd, request);
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_LOOKBACK_COMPLETED)); auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_LOOKBACK_COMPLETED));
logger.info("Lookback has finished"); LOGGER.info("[{}] Lookback has finished", jobId);
if (isLookbackOnly) { if (isLookbackOnly) {
return null; return null;
} else { } else {
@ -103,8 +104,10 @@ class ScheduledJob {
} }
public void stop() { public void stop() {
running = false; if (dataExtractor != null) {
dataExtractor.cancel(); dataExtractor.cancel();
}
running = false;
auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED)); auditor.info(Messages.getMessage(Messages.JOB_AUDIT_SCHEDULER_STOPPED));
} }
@ -117,12 +120,12 @@ class ScheduledJob {
return; return;
} }
logger.trace("Searching data in: [" + start + ", " + end + ")"); LOGGER.trace("[{}] Searching data in: [{}, {})", jobId, start, end);
RuntimeException error = null; RuntimeException error = null;
long recordCount = 0; long recordCount = 0;
dataExtractor.newSearch(start, end, logger); dataExtractor = dataExtractorFactory.newExtractor(start, end);
while (running && dataExtractor.hasNext()) { while (dataExtractor.hasNext()) {
Optional<InputStream> extractedData; Optional<InputStream> extractedData;
try { try {
extractedData = dataExtractor.next(); extractedData = dataExtractor.next();
@ -152,6 +155,7 @@ class ScheduledJob {
} }
} }
} }
dataExtractor = null;
lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1); lastEndTimeMs = Math.max(lastEndTimeMs == null ? 0 : lastEndTimeMs, end - 1);

View File

@ -24,14 +24,14 @@ import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency; import org.elasticsearch.xpack.prelert.job.config.DefaultFrequency;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation; import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.prelert.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.results.Bucket; import org.elasticsearch.xpack.prelert.job.results.Bucket;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.scheduler.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.time.Duration; import java.time.Duration;
@ -47,18 +47,16 @@ public class ScheduledJobRunner extends AbstractComponent {
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final JobProvider jobProvider; private final JobProvider jobProvider;
private final DataExtractorFactory dataExtractorFactory;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier; private final Supplier<Long> currentTimeSupplier;
public ScheduledJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, public ScheduledJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
DataExtractorFactory dataExtractorFactory, Supplier<Long> currentTimeSupplier) { Supplier<Long> currentTimeSupplier) {
super(Settings.EMPTY); super(Settings.EMPTY);
this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService);
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.jobProvider = Objects.requireNonNull(jobProvider); this.jobProvider = Objects.requireNonNull(jobProvider);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory); this.threadPool = threadPool;
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
} }
@ -192,14 +190,18 @@ public class ScheduledJobRunner extends AbstractComponent {
Auditor auditor = jobProvider.audit(job.getId()); Auditor auditor = jobProvider.audit(job.getId());
Duration frequency = getFrequencyOrDefault(scheduler, job); Duration frequency = getFrequencyOrDefault(scheduler, job);
Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay()); Duration queryDelay = Duration.ofSeconds(scheduler.getConfig().getQueryDelay());
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(scheduler.getConfig(), job); DataExtractorFactory dataExtractorFactory = createDataExtractorFactory(scheduler.getConfig(), job);
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(), ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
dataExtractor, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs); dataExtractorFactory, client, auditor, currentTimeSupplier, finalBucketEndMs, latestRecordTimeMs);
Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler); Holder holder = new Holder(scheduler, scheduledJob, new ProblemTracker(() -> auditor), handler);
task.setHolder(holder); task.setHolder(holder);
return holder; return holder;
} }
DataExtractorFactory createDataExtractorFactory(SchedulerConfig schedulerConfig, Job job) {
return new ScrollDataExtractorFactory(client, schedulerConfig, job);
}
private void gatherInformation(String jobId, BiConsumer<QueryPage<Bucket>, DataCounts> handler, Consumer<Exception> errorHandler) { private void gatherInformation(String jobId, BiConsumer<QueryPage<Bucket>, DataCounts> handler, Consumer<Exception> errorHandler) {
BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder() BucketsQueryBuilder.BucketsQuery latestBucketQuery = new BucketsQueryBuilder()
.sortField(Bucket.TIMESTAMP.getPreferredName()) .sortField(Bucket.TIMESTAMP.getPreferredName())

View File

@ -3,28 +3,13 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.xpack.prelert.job.extraction; package org.elasticsearch.xpack.prelert.scheduler.extractor;
import org.apache.logging.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Optional; import java.util.Optional;
public interface DataExtractor { public interface DataExtractor {
/**
* Set-up the extractor for a new search
*
* @param start start time
* @param end end time
* @param logger logger
*/
void newSearch(long start, long end, Logger logger) throws IOException;
/**
* Cleans up after a search.
*/
void clear();
/** /**
* @return {@code true} if the search has not finished yet, or {@code false} otherwise * @return {@code true} if the search has not finished yet, or {@code false} otherwise

View File

@ -3,11 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.xpack.prelert.job.extraction; package org.elasticsearch.xpack.prelert.scheduler.extractor;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
public interface DataExtractorFactory { public interface DataExtractorFactory {
DataExtractor newExtractor(SchedulerConfig schedulerConfig, Job job); DataExtractor newExtractor(long start, long end);
} }

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public final class SearchHitFieldExtractor {
private SearchHitFieldExtractor() {}
public static Object[] extractField(SearchHit hit, String field) {
SearchHitField keyValue = hit.field(field);
if (keyValue != null) {
List<Object> values = keyValue.values();
return values.toArray(new Object[values.size()]);
} else {
return extractFieldFromSource(hit.getSource(), field);
}
}
private static Object[] extractFieldFromSource(Map<String, Object> source, String field) {
if (source != null) {
Object values = source.get(field);
if (values != null) {
if (values instanceof Object[]) {
return (Object[]) values;
} else {
return new Object[]{values};
}
}
}
return new Object[0];
}
public static Long extractTimeField(SearchHit hit, String timeField) {
Object[] fields = extractField(hit, timeField);
if (fields.length != 1) {
throw new RuntimeException("Time field [" + timeField + "] expected a single value; actual was: " + Arrays.toString(fields));
}
if (fields[0] instanceof Long) {
return (Long) fields[0];
}
throw new RuntimeException("Time field [" + timeField + "] expected a long value; actual was: " + fields[0]);
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.io.OutputStream;
public class SearchHitToJsonProcessor implements Releasable {
private final String[] fields;
private final XContentBuilder jsonBuilder;
public SearchHitToJsonProcessor(String[] fields, OutputStream outputStream) throws IOException {
this.fields = fields;
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
}
public void process(SearchHit hit) throws IOException {
jsonBuilder.startObject();
for (String field : fields) {
writeKeyValue(field, SearchHitFieldExtractor.extractField(hit, field));
}
jsonBuilder.endObject();
}
private void writeKeyValue(String key, Object... values) throws IOException {
if (values.length == 0) {
return;
}
if (values.length == 1) {
jsonBuilder.field(key, values[0]);
} else {
jsonBuilder.array(key, values);
}
}
@Override
public void close() {
jsonBuilder.close();
}
}

View File

@ -0,0 +1,171 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor.scroll;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractor;
import org.elasticsearch.xpack.prelert.scheduler.extractor.SearchHitFieldExtractor;
import org.elasticsearch.xpack.prelert.scheduler.extractor.SearchHitToJsonProcessor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
class ScrollDataExtractor implements DataExtractor {
private static final Logger LOGGER = Loggers.getLogger(ScrollDataExtractor.class);
private static final TimeValue SCROLL_TIMEOUT = new TimeValue(10, TimeUnit.MINUTES);
private final Client client;
private final ScrollDataExtractorContext context;
private volatile String scrollId;
private volatile boolean isCancelled;
private volatile boolean hasNext;
private volatile Long timestampOnCancel;
public ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client);
this.context = Objects.requireNonNull(dataExtractorContext);
this.hasNext = true;
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public void cancel() {
LOGGER.trace("[{}] Data extractor received cancel request", context.jobId);
isCancelled = true;
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Optional<InputStream> stream = scrollId == null ? Optional.ofNullable(initScroll()) : Optional.ofNullable(continueScroll());
if (!stream.isPresent()) {
hasNext = false;
}
return stream;
}
private InputStream initScroll() throws IOException {
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
if (searchResponse.status() != RestStatus.OK) {
throw new IOException("[" + context.jobId + "] Search request returned status code: " + searchResponse.status()
+ ". Response was:\n" + searchResponse.toString());
}
return processSearchResponse(searchResponse);
}
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
return searchRequestBuilder.get();
}
private SearchRequestBuilder buildSearchRequest() {
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
.setScroll(SCROLL_TIMEOUT)
.setSearchType(SearchType.QUERY_AND_FETCH)
.addSort(context.timeField, SortOrder.ASC)
.setIndices(context.indexes)
.setTypes(context.types)
.setSize(context.scrollSize)
.setQuery(createQuery());
if (context.aggregations != null) {
searchRequestBuilder.setSize(0);
for (AggregationBuilder aggregationBuilder : context.aggregations.getAggregatorFactories()) {
searchRequestBuilder.addAggregation(aggregationBuilder);
}
for (PipelineAggregationBuilder pipelineAggregationBuilder : context.aggregations.getPipelineAggregatorFactories()) {
searchRequestBuilder.addAggregation(pipelineAggregationBuilder);
}
}
for (SearchSourceBuilder.ScriptField scriptField : context.scriptFields) {
searchRequestBuilder.addScriptField(scriptField.fieldName(), scriptField.script());
}
return searchRequestBuilder;
}
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
scrollId = searchResponse.getScrollId();
if (searchResponse.getHits().hits().length == 0) {
hasNext = false;
return null;
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.jobFields, outputStream)) {
for (SearchHit hit : searchResponse.getHits().hits()) {
if (isCancelled) {
Long timestamp = SearchHitFieldExtractor.extractTimeField(hit, context.timeField);
if (timestamp != null) {
if (timestampOnCancel == null) {
timestampOnCancel = timestamp;
} else if (timestamp != timestampOnCancel) {
hasNext = false;
clearScroll(scrollId);
break;
}
}
}
hitProcessor.process(hit);
}
}
return new ByteArrayInputStream(outputStream.toByteArray());
}
private InputStream continueScroll() throws IOException {
SearchResponse searchResponse = executeSearchScrollRequest(scrollId);
if (searchResponse.status() != RestStatus.OK) {
throw new IOException("[" + context.jobId + "] Continue search scroll request with id '" + scrollId + "' returned status code: "
+ searchResponse.status() + ". Response was:\n" + searchResponse.toString());
}
return processSearchResponse(searchResponse);
}
protected SearchResponse executeSearchScrollRequest(String scrollId) {
return SearchScrollAction.INSTANCE.newRequestBuilder(client)
.setScroll(SCROLL_TIMEOUT)
.setScrollId(scrollId)
.get();
}
private QueryBuilder createQuery() {
QueryBuilder userQuery = context.query;
QueryBuilder timeQuery = new RangeQueryBuilder(context.timeField).gte(context.start).lt(context.end).format("epoch_millis");
return new BoolQueryBuilder().filter(userQuery).filter(timeQuery);
}
void clearScroll(String scrollId) {
ClearScrollAction.INSTANCE.newRequestBuilder(client).addScrollId(scrollId).get();
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor.scroll;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.util.List;
import java.util.Objects;
public class ScrollDataExtractorContext {
final String jobId;
final String[] jobFields;
final String timeField;
final String[] indexes;
final String[] types;
final QueryBuilder query;
@Nullable
final AggregatorFactories.Builder aggregations;
final List<SearchSourceBuilder.ScriptField> scriptFields;
final int scrollSize;
final long start;
final long end;
public ScrollDataExtractorContext(String jobId, List<String> jobFields, String timeField, List<String> indexes, List<String> types,
QueryBuilder query, @Nullable AggregatorFactories.Builder aggregations,
List<SearchSourceBuilder.ScriptField> scriptFields, int scrollSize, long start, long end) {
this.jobId = Objects.requireNonNull(jobId);
this.jobFields = jobFields.toArray(new String[jobFields.size()]);
this.timeField = Objects.requireNonNull(timeField);
this.indexes = indexes.toArray(new String[indexes.size()]);
this.types = types.toArray(new String[types.size()]);
this.query = Objects.requireNonNull(query);
this.aggregations = aggregations;
this.scriptFields = Objects.requireNonNull(scriptFields);
this.scrollSize = scrollSize;
this.start = start;
this.end = end;
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor.scroll;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractor;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractorFactory;
import java.util.Objects;
public class ScrollDataExtractorFactory implements DataExtractorFactory {
private final Client client;
private final SchedulerConfig schedulerConfig;
private final Job job;
public ScrollDataExtractorFactory(Client client, SchedulerConfig schedulerConfig, Job job) {
this.client = Objects.requireNonNull(client);
this.schedulerConfig = Objects.requireNonNull(schedulerConfig);
this.job = Objects.requireNonNull(job);
}
@Override
public DataExtractor newExtractor(long start, long end) {
String timeField = job.getDataDescription().getTimeField();
ScrollDataExtractorContext dataExtractorContext = new ScrollDataExtractorContext(
job.getId(),
job.allFields(),
timeField,
schedulerConfig.getIndexes(),
schedulerConfig.getTypes(),
schedulerConfig.getQuery(),
schedulerConfig.getAggregations(),
schedulerConfig.getScriptFields(),
schedulerConfig.getScrollSize(),
start,
end);
return new ScrollDataExtractor(client, dataExtractorContext);
}
}

View File

@ -1,250 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ElasticsearchDataExtractor implements DataExtractor {
private static final String CLEAR_SCROLL_TEMPLATE = "{\"scroll_id\":[\"%s\"]}";
private static final Pattern TOTAL_HITS_PATTERN = Pattern.compile("\"hits\":\\{.*?\"total\":(.*?),");
private static final Pattern EARLIEST_TIME_PATTERN = Pattern.compile("\"earliestTime\":\\{.*?\"value\":(.*?),");
private static final Pattern LATEST_TIME_PATTERN = Pattern.compile("\"latestTime\":\\{.*?\"value\":(.*?),");
private static final Pattern INDEX_PATTERN = Pattern.compile("\"_index\":\"(.*?)\"");
private static final Pattern NUMBER_OF_SHARDS_PATTERN = Pattern.compile("\"number_of_shards\":\"(.*?)\"");
private static final long CHUNK_THRESHOLD_MS = 3600000;
private static final long MIN_CHUNK_SIZE_MS = 10000L;
private final HttpRequester httpRequester;
private final ElasticsearchUrlBuilder urlBuilder;
private final ElasticsearchQueryBuilder queryBuilder;
private final int scrollSize;
private final ScrollState scrollState;
private volatile long currentStartTime;
private volatile long currentEndTime;
private volatile long endTime;
private volatile boolean isFirstSearch;
private volatile boolean isCancelled;
/**
* The interval of each scroll search. Will be null when search is not chunked.
*/
private volatile Long m_Chunk;
private volatile Logger m_Logger;
public ElasticsearchDataExtractor(HttpRequester httpRequester, ElasticsearchUrlBuilder urlBuilder,
ElasticsearchQueryBuilder queryBuilder, int scrollSize) {
this.httpRequester = Objects.requireNonNull(httpRequester);
this.urlBuilder = Objects.requireNonNull(urlBuilder);
this.scrollSize = scrollSize;
this.queryBuilder = Objects.requireNonNull(queryBuilder);
scrollState = queryBuilder.isAggregated() ? ScrollState.createAggregated() : ScrollState.createDefault();
isFirstSearch = true;
}
@Override
public void newSearch(long startEpochMs, long endEpochMs, Logger logger) throws IOException {
m_Logger = logger;
m_Logger.info("Requesting data from '" + urlBuilder.getBaseUrl() + "' within [" + startEpochMs + ", " + endEpochMs + ")");
scrollState.reset();
currentStartTime = startEpochMs;
currentEndTime = endEpochMs;
endTime = endEpochMs;
isCancelled = false;
if (endEpochMs - startEpochMs > CHUNK_THRESHOLD_MS) {
setUpChunkedSearch();
}
if (isFirstSearch) {
queryBuilder.logQueryInfo(m_Logger);
isFirstSearch = false;
}
}
private void setUpChunkedSearch() throws IOException {
m_Chunk = null;
String url = urlBuilder.buildSearchSizeOneUrl();
String response = requestAndGetStringResponse(url, queryBuilder.createDataSummaryQuery(currentStartTime, endTime));
long totalHits = matchLong(response, TOTAL_HITS_PATTERN);
if (totalHits > 0) {
// Aggregation value may be a double
currentStartTime = (long) matchDouble(response, EARLIEST_TIME_PATTERN);
long latestTime = (long) matchDouble(response, LATEST_TIME_PATTERN);
long dataTimeSpread = latestTime - currentStartTime;
String index = matchString(response, INDEX_PATTERN);
long shards = readNumberOfShards(index);
m_Chunk = Math.max(MIN_CHUNK_SIZE_MS, (shards * scrollSize * dataTimeSpread) / totalHits);
currentEndTime = currentStartTime + m_Chunk;
m_Logger.debug("Chunked search configured: totalHits = " + totalHits
+ ", dataTimeSpread = " + dataTimeSpread + " ms, chunk size = " + m_Chunk
+ " ms");
} else {
currentStartTime = endTime;
}
}
private String requestAndGetStringResponse(String url, String body) throws IOException {
m_Logger.trace("url ={}, body={}", url, body);
HttpResponse response = httpRequester.get(url, body);
if (response.getResponseCode() != HttpResponse.OK_STATUS) {
throw new IOException("Request '" + url + "' failed with status code: "
+ response.getResponseCode() + ". Response was:\n" + response.getResponseAsString());
}
return response.getResponseAsString();
}
private static long matchLong(String response, Pattern pattern) throws IOException {
String match = matchString(response, pattern);
try {
return Long.parseLong(match);
} catch (NumberFormatException e) {
throw new IOException("Failed to parse long from pattern '" + pattern + "'. Response was:\n" + response, e);
}
}
private static double matchDouble(String response, Pattern pattern) throws IOException {
String match = matchString(response, pattern);
try {
return Double.parseDouble(match);
} catch (NumberFormatException e) {
throw new IOException("Failed to parse double from pattern '" + pattern + "'. Response was:\n" + response, e);
}
}
private static String matchString(String response, Pattern pattern) throws IOException {
Matcher matcher = pattern.matcher(response);
if (!matcher.find()) {
throw new IOException("Failed to parse string from pattern '" + pattern + "'. Response was:\n" + response);
}
return matcher.group(1);
}
private long readNumberOfShards(String index) throws IOException {
String url = urlBuilder.buildIndexSettingsUrl(index);
String response = requestAndGetStringResponse(url, null);
return matchLong(response, NUMBER_OF_SHARDS_PATTERN);
}
@Override
public void clear() {
scrollState.reset();
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
try {
return getNextStream();
} catch (IOException e) {
m_Logger.error("An error occurred during requesting data from: " + urlBuilder.getBaseUrl(), e);
scrollState.forceComplete();
throw e;
}
}
private Optional<InputStream> getNextStream() throws IOException {
while (hasNext()) {
boolean isNewScroll = scrollState.getScrollId() == null || scrollState.isComplete();
InputStream stream = isNewScroll ? initScroll() : continueScroll();
stream = scrollState.updateFromStream(stream);
if (scrollState.isComplete()) {
clearScroll();
advanceTime();
// If it was a new scroll it means it returned 0 hits. If we are doing
// a chunked search, we reconfigure the search in order to jump to the next
// time interval where there are data.
if (isNewScroll && hasNext() && !isCancelled && m_Chunk != null) {
setUpChunkedSearch();
}
} else {
return Optional.of(stream);
}
}
return Optional.empty();
}
private void clearScroll() {
if (scrollState.getScrollId() == null) {
return;
}
String url = urlBuilder.buildClearScrollUrl();
try {
HttpResponse response = httpRequester.delete(url, String.format(Locale.ROOT, CLEAR_SCROLL_TEMPLATE, scrollState.getScrollId()));
// This is necessary to ensure the response stream has been consumed entirely.
// Failing to do this can cause a lot of issues with Elasticsearch when
// scheduled jobs are running concurrently.
response.getResponseAsString();
} catch (IOException e) {
m_Logger.error("An error ocurred during clearing scroll context", e);
}
scrollState.clearScrollId();
}
@Override
public boolean hasNext() {
return !scrollState.isComplete() || (!isCancelled && currentStartTime < endTime);
}
private InputStream initScroll() throws IOException {
String url = buildInitScrollUrl();
String searchBody = queryBuilder.createSearchBody(currentStartTime, currentEndTime);
m_Logger.trace("About to submit body " + searchBody + " to URL " + url);
HttpResponse response = httpRequester.get(url, searchBody);
if (response.getResponseCode() != HttpResponse.OK_STATUS) {
throw new IOException("Request '" + url + "' failed with status code: "
+ response.getResponseCode() + ". Response was:\n" + response.getResponseAsString());
}
return response.getStream();
}
private void advanceTime() {
currentStartTime = currentEndTime;
currentEndTime = m_Chunk == null ? endTime : Math.min(currentStartTime + m_Chunk, endTime);
}
private String buildInitScrollUrl() throws IOException {
// With aggregations we don't want any hits returned for the raw data,
// just the aggregations
int size = queryBuilder.isAggregated() ? 0 : scrollSize;
return urlBuilder.buildInitScrollUrl(size);
}
private InputStream continueScroll() throws IOException {
// Aggregations never need a continuation
if (!queryBuilder.isAggregated()) {
String url = urlBuilder.buildContinueScrollUrl();
HttpResponse response = httpRequester.get(url, scrollState.getScrollId());
if (response.getResponseCode() == HttpResponse.OK_STATUS) {
return response.getStream();
}
throw new IOException("Request '" + url + "' with scroll id '"
+ scrollState.getScrollId() + "' failed with status code: "
+ response.getResponseCode() + ". Response was:\n"
+ response.getResponseAsString());
}
return null;
}
@Override
public void cancel() {
isCancelled = true;
}
}

View File

@ -1,129 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.apache.logging.log4j.Logger;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.Objects;
public class ElasticsearchQueryBuilder {
/**
* The search body for Elasticsearch version 2.x contains sorting
* based on the time field and a query. The query is composed by
* a bool query with two must clauses, the recommended way to perform an AND query.
* There are 6 placeholders:
* <ol>
* <li> sort field
* <li> user defined query
* <li> time field
* <li> start time (String in date_time format)
* <li> end time (String in date_time format)
* <li> extra (may be empty or contain aggregations, fields, etc.)
* </ol>
*/
private static final String SEARCH_BODY_TEMPLATE_2_X = "{"
+ "\"sort\": ["
+ "{\"%s\": {\"order\": \"asc\"}}"
+ "],"
+ "\"query\": {"
+ "\"bool\": {"
+ "\"filter\": ["
+ "%s,"
+ "{"
+ "\"range\": {"
+ "\"%s\": {"
+ "\"gte\": \"%s\","
+ "\"lt\": \"%s\","
+ "\"format\": \"date_time\""
+ "}"
+ "}"
+ "}"
+ "]"
+ "}"
+ "}%s"
+ "}";
private static final String DATA_SUMMARY_SORT_FIELD = "_doc";
/**
* Aggregations in order to retrieve the earliest and latest record times.
* The single placeholder expects the time field.
*/
private static final String DATA_SUMMARY_AGGS_TEMPLATE = ""
+ "{"
+ "\"earliestTime\":{"
+ "\"min\":{\"field\":\"%1$s\"}"
+ "},"
+ "\"latestTime\":{"
+ "\"max\":{\"field\":\"%1$s\"}"
+ "}"
+ "}";
private static final String AGGREGATION_TEMPLATE = ", \"aggs\": %s";
private static final String SCRIPT_FIELDS_TEMPLATE = ", \"script_fields\": %s";
private final String search;
private final String aggregations;
private final String scriptFields;
private final String timeField;
public ElasticsearchQueryBuilder(String search, String aggs, String scriptFields, String timeField) {
this.search = Objects.requireNonNull(search);
aggregations = aggs;
this.scriptFields = scriptFields;
this.timeField = Objects.requireNonNull(timeField);
}
public String createSearchBody(long start, long end) {
return createSearchBody(start, end, timeField, aggregations);
}
private String createSearchBody(long start, long end, String sortField, String aggs) {
return String.format(Locale.ROOT, SEARCH_BODY_TEMPLATE_2_X, sortField, search, timeField, formatAsDateTime(start),
formatAsDateTime(end), createResultsFormatSpec(aggs));
}
private static String formatAsDateTime(long epochMs) {
Instant instant = Instant.ofEpochMilli(epochMs);
ZonedDateTime dateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.UTC);
return dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", Locale.ROOT));
}
private String createResultsFormatSpec(String aggs) {
return (aggs != null) ? createAggregations(aggs) : createScriptFields();
}
private String createAggregations(String aggs) {
return String.format(Locale.ROOT, AGGREGATION_TEMPLATE, aggs);
}
private String createScriptFields() {
return (scriptFields != null) ? String.format(Locale.ROOT, SCRIPT_FIELDS_TEMPLATE, scriptFields) : "";
}
public String createDataSummaryQuery(long start, long end) {
String aggs = String.format(Locale.ROOT, DATA_SUMMARY_AGGS_TEMPLATE, timeField);
return createSearchBody(start, end, DATA_SUMMARY_SORT_FIELD, aggs);
}
public void logQueryInfo(Logger logger) {
if (aggregations != null) {
logger.debug("Will use the following Elasticsearch aggregations: " + aggregations);
} else {
logger.debug("Will retrieve whole _source document from Elasticsearch");
}
}
public boolean isAggregated() {
return aggregations != null;
}
}

View File

@ -1,86 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
public class ElasticsearchUrlBuilder {
private static final String SLASH = "/";
private static final String COMMA = ",";
private static final int SCROLL_CONTEXT_MINUTES = 60;
private static final String INDEX_SETTINGS_END_POINT = "%s/_settings";
private static final String SEARCH_SIZE_ONE_END_POINT = "_search?size=1";
private static final String SEARCH_SCROLL_END_POINT = "_search?scroll=" + SCROLL_CONTEXT_MINUTES + "m&size=%d";
private static final String CONTINUE_SCROLL_END_POINT = "_search/scroll?scroll=" + SCROLL_CONTEXT_MINUTES + "m";
private static final String CLEAR_SCROLL_END_POINT = "_search/scroll";
private final String baseUrl;
private final String indexes;
private final String types;
private ElasticsearchUrlBuilder(String baseUrl, String indexes, String types) {
this.baseUrl = Objects.requireNonNull(baseUrl);
this.indexes = Objects.requireNonNull(indexes);
this.types = Objects.requireNonNull(types);
}
public static ElasticsearchUrlBuilder create(List<String> indexes, List<String> types) {
// norelease: This class will be removed once we switch to a client based data extractor
return create(indexes, types, "http://localhost:9200/");
}
public static ElasticsearchUrlBuilder create(List<String> indexes, List<String> types, String baseUrl) {
String indexesAsString = indexes.stream().collect(Collectors.joining(COMMA));
String typesAsString = types.stream().collect(Collectors.joining(COMMA));
return new ElasticsearchUrlBuilder(baseUrl, indexesAsString, typesAsString);
}
public String buildIndexSettingsUrl(String index) {
return newUrlBuilder().append(String.format(Locale.ROOT, INDEX_SETTINGS_END_POINT, index)).toString();
}
public String buildSearchSizeOneUrl() {
return buildUrlWithIndicesAndTypes().append(SEARCH_SIZE_ONE_END_POINT).toString();
}
public String buildInitScrollUrl(int scrollSize) {
return buildUrlWithIndicesAndTypes()
.append(String.format(Locale.ROOT, SEARCH_SCROLL_END_POINT, scrollSize))
.toString();
}
public String buildContinueScrollUrl() {
return newUrlBuilder().append(CONTINUE_SCROLL_END_POINT).toString();
}
public String buildClearScrollUrl() {
return newUrlBuilder().append(CLEAR_SCROLL_END_POINT).toString();
}
private StringBuilder newUrlBuilder() {
return new StringBuilder(baseUrl);
}
private StringBuilder buildUrlWithIndicesAndTypes() {
StringBuilder urlBuilder = buildUrlWithIndices();
if (!types.isEmpty()) {
urlBuilder.append(types).append(SLASH);
}
return urlBuilder;
}
private StringBuilder buildUrlWithIndices() {
return newUrlBuilder().append(indexes).append(SLASH);
}
public String getBaseUrl() {
return baseUrl;
}
}

View File

@ -1,112 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchRequestParsers;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
import org.elasticsearch.xpack.prelert.utils.FixBlockingClientOperations;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class HttpDataExtractorFactory implements DataExtractorFactory {
private static final Logger LOGGER = Loggers.getLogger(HttpDataExtractorFactory.class);
private final Client client;
private final SearchRequestParsers searchRequestParsers;
public HttpDataExtractorFactory(Client client, SearchRequestParsers searchRequestParsers) {
this.client = Objects.requireNonNull(client);
this.searchRequestParsers = Objects.requireNonNull(searchRequestParsers);
}
@Override
public DataExtractor newExtractor(SchedulerConfig schedulerConfig, Job job) {
String timeField = job.getDataDescription().getTimeField();
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(
xContentToJson(schedulerConfig.getQuery()),
stringifyAggregations(schedulerConfig.getAggregations()),
stringifyScriptFields(schedulerConfig.getScriptFields()),
timeField);
HttpRequester httpRequester = new HttpRequester();
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder
.create(schedulerConfig.getIndexes(), schedulerConfig.getTypes(), getBaseUrl());
return new ElasticsearchDataExtractor(httpRequester, urlBuilder, queryBuilder, schedulerConfig.getScrollSize());
}
private String getBaseUrl() {
NodesInfoRequest request = new NodesInfoRequest();
NodesInfoResponse nodesInfoResponse = FixBlockingClientOperations.executeBlocking(client, NodesInfoAction.INSTANCE, request);
TransportAddress address = nodesInfoResponse.getNodes().get(0).getHttp().getAddress().publishAddress();
String baseUrl = "http://" + address.getAddress() + ":" + address.getPort() + "/";
LOGGER.info("Base URL: " + baseUrl);
return baseUrl;
}
private String xContentToJson(ToXContent xContent) {
try {
XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
xContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
return jsonBuilder.string();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
String stringifyAggregations(AggregatorFactories.Builder aggregations) {
if (aggregations == null) {
return null;
}
return xContentToJson(aggregations);
}
String stringifyScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
if (scriptFields.isEmpty()) {
return null;
}
try {
XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
jsonBuilder.startObject();
for (SearchSourceBuilder.ScriptField scriptField : scriptFields) {
scriptField.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
}
jsonBuilder.endObject();
return jsonBuilder.string();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static String writeMapAsJson(Map<String, Object> map) {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.map(map);
return builder.string();
} catch (IOException e) {
throw new ElasticsearchParseException("failed to convert map to JSON string", e);
}
}
}

View File

@ -1,160 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
/**
* Executes requests from an HTTP or HTTPS URL by sending a request body.
* HTTP or HTTPS is deduced from the supplied URL.
* Invalid certificates are tolerated for HTTPS access, similar to "curl -k".
*/
public class HttpRequester {
private static final Logger LOGGER = Loggers.getLogger(HttpRequester.class);
private static final String TLS = "TLS";
private static final String GET = "GET";
private static final String DELETE = "DELETE";
private static final String AUTH_HEADER = "Authorization";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final SSLSocketFactory TRUSTING_SOCKET_FACTORY;
private static final HostnameVerifier TRUSTING_HOSTNAME_VERIFIER;
private static final int CONNECT_TIMEOUT_MILLIS = 30000;
private static final int READ_TIMEOUT_MILLIS = 600000;
static {
SSLSocketFactory trustingSocketFactory = null;
try {
SSLContext sslContext = SSLContext.getInstance(TLS);
sslContext.init(null, new TrustManager[]{ new NoOpTrustManager() }, null);
trustingSocketFactory = sslContext.getSocketFactory();
} catch (KeyManagementException | NoSuchAlgorithmException e) {
LOGGER.warn("Unable to set up trusting socket factory", e);
}
TRUSTING_SOCKET_FACTORY = trustingSocketFactory;
TRUSTING_HOSTNAME_VERIFIER = new NoOpHostnameVerifier();
}
private final String authHeader;
private final String contentTypeHeader;
public HttpRequester() {
this(null);
}
public HttpRequester(String authHeader) {
this(authHeader, null);
}
public HttpRequester(String authHeader, String contentTypeHeader) {
this.authHeader = authHeader;
this.contentTypeHeader = contentTypeHeader;
}
public HttpResponse get(String url, String requestBody) throws IOException {
return request(url, requestBody, GET);
}
public HttpResponse delete(String url, String requestBody) throws IOException {
return request(url, requestBody, DELETE);
}
private HttpResponse request(String url, String requestBody, String method) throws IOException {
URL urlObject = new URL(url);
HttpURLConnection connection = (HttpURLConnection) urlObject.openConnection();
connection.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
connection.setReadTimeout(READ_TIMEOUT_MILLIS);
// TODO: we could add a config option to allow users who want to
// rigorously enforce valid certificates to do so
if (connection instanceof HttpsURLConnection) {
// This is the equivalent of "curl -k", i.e. tolerate connecting to
// an Elasticsearch with a self-signed certificate or a certificate
// that doesn't match its hostname.
HttpsURLConnection httpsConnection = (HttpsURLConnection)connection;
if (TRUSTING_SOCKET_FACTORY != null) {
httpsConnection.setSSLSocketFactory(TRUSTING_SOCKET_FACTORY);
}
httpsConnection.setHostnameVerifier(TRUSTING_HOSTNAME_VERIFIER);
}
connection.setRequestMethod(method);
if (authHeader != null) {
connection.setRequestProperty(AUTH_HEADER, authHeader);
}
if (contentTypeHeader != null) {
connection.setRequestProperty(CONTENT_TYPE_HEADER, contentTypeHeader);
}
if (requestBody != null) {
connection.setDoOutput(true);
writeRequestBody(requestBody, connection);
}
if (connection.getResponseCode() != HttpResponse.OK_STATUS) {
return new HttpResponse(connection.getErrorStream(), connection.getResponseCode());
}
return new HttpResponse(connection.getInputStream(), connection.getResponseCode());
}
private static void writeRequestBody(String requestBody, HttpURLConnection connection) throws IOException {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.writeBytes(requestBody);
dataOutputStream.flush();
dataOutputStream.close();
}
/**
* Hostname verifier that ignores hostname discrepancies.
*/
private static final class NoOpHostnameVerifier implements HostnameVerifier {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
}
/**
* Certificate trust manager that ignores certificate issues.
*/
private static final class NoOpTrustManager implements X509TrustManager {
private static final X509Certificate[] EMPTY_CERTIFICATE_ARRAY = new X509Certificate[0];
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
// Ignore certificate problems
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
// Ignore certificate problems
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return EMPTY_CERTIFICATE_ARRAY;
}
}
}

View File

@ -1,53 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
/**
* Encapsulates the HTTP response stream and the status code.
*
* <p><b>Important note</b>: The stream has to be consumed thoroughly.
* Java is keeping connections alive thus reusing them and any
* streams with dangling data can lead to problems.
*/
class HttpResponse {
public static final int OK_STATUS = 200;
private static final String NEW_LINE = "\n";
private final InputStream stream;
private final int responseCode;
public HttpResponse(InputStream responseStream, int responseCode) {
stream = responseStream;
this.responseCode = responseCode;
}
public int getResponseCode() {
return responseCode;
}
public InputStream getStream() {
return stream;
}
public String getResponseAsString() throws IOException {
return getStreamAsString(stream);
}
public static String getStreamAsString(InputStream stream) throws IOException {
try (BufferedReader buffer = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
return buffer.lines().collect(Collectors.joining(NEW_LINE));
}
}
}

View File

@ -1,166 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Holds the state of an Elasticsearch scroll.
*/
class ScrollState {
private static final Pattern SCROLL_ID_PATTERN = Pattern.compile("\"_scroll_id\":\"(.*?)\"");
private static final Pattern DEFAULT_PEEK_END_PATTERN = Pattern.compile("\"hits\":\\[(.)");
private static final Pattern AGGREGATED_PEEK_END_PATTERN = Pattern.compile("\"aggregations\":.*?\"buckets\":\\[(.)");
private static final String CLOSING_SQUARE_BRACKET = "]";
/**
* We want to read up until the "hits" or "buckets" array. Scroll IDs can be
* quite long in clusters that have many nodes/shards. The longest reported
* scroll ID is 20708 characters - see
* http://elasticsearch-users.115913.n3.nabble.com/Ridiculously-long-Scroll-id-td4038567.html
* <br>
* We set a max byte limit for the stream peeking to 1 MB.
*/
private static final int MAX_PEEK_BYTES = 1024 * 1024;
/**
* We try to search for the scroll ID and whether the scroll is complete every time
* we have read a chunk of size 32 KB.
*/
private static final int PEEK_CHUNK_SIZE = 32 * 1024;
private final int peekMaxSize;
private final int peekChunkSize;
private final Pattern peekEndPattern;
private volatile String scrollId;
private volatile boolean isComplete;
private ScrollState(int peekMaxSize, int peekChunkSize, Pattern scrollCompletePattern) {
this.peekMaxSize = peekMaxSize;
this.peekChunkSize = peekChunkSize;
peekEndPattern = Objects.requireNonNull(scrollCompletePattern);
}
/**
* Creates a {@code ScrollState} for a search without aggregations
* @return the {@code ScrollState}
*/
public static ScrollState createDefault() {
return new ScrollState(MAX_PEEK_BYTES, PEEK_CHUNK_SIZE, DEFAULT_PEEK_END_PATTERN);
}
/**
* Creates a {@code ScrollState} for a search with aggregations
* @return the {@code ScrollState}
*/
public static ScrollState createAggregated() {
return new ScrollState(MAX_PEEK_BYTES, PEEK_CHUNK_SIZE, AGGREGATED_PEEK_END_PATTERN);
}
public final void reset() {
scrollId = null;
isComplete = false;
}
public void clearScrollId() {
scrollId = null;
}
public String getScrollId() {
return scrollId;
}
public boolean isComplete() {
return isComplete;
}
public void forceComplete() {
isComplete = true;
}
/**
* Peeks into the stream and updates the scroll ID and whether the scroll is complete.
* <p>
* <em>After calling that method the given stream cannot be reused.
* Use the returned stream instead.</em>
* </p>
*
* @param stream the stream
* @return a new {@code InputStream} object which should be used instead of the given stream
* for further processing
* @throws IOException if an I/O error occurs while manipulating the stream or the stream
* contains no scroll ID
*/
public InputStream updateFromStream(InputStream stream) throws IOException {
if (stream == null) {
isComplete = true;
return null;
}
PushbackInputStream pushbackStream = new PushbackInputStream(stream, peekMaxSize);
byte[] buffer = new byte[peekMaxSize];
int totalBytesRead = 0;
int currentChunkSize = 0;
int bytesRead = 0;
while (bytesRead >= 0 && totalBytesRead < peekMaxSize) {
bytesRead = stream.read(buffer, totalBytesRead, Math.min(peekChunkSize, peekMaxSize - totalBytesRead));
if (bytesRead > 0) {
totalBytesRead += bytesRead;
currentChunkSize += bytesRead;
}
if (bytesRead < 0 || currentChunkSize >= peekChunkSize) {
// We make the assumption here that invalid byte sequences will be read as invalid
// char rather than throwing an exception
String peekString = new String(buffer, 0, totalBytesRead, StandardCharsets.UTF_8);
if (matchScrollState(peekString)) {
break;
}
currentChunkSize = 0;
}
}
pushbackStream.unread(buffer, 0, totalBytesRead);
if (scrollId == null) {
throw new IOException("Field '_scroll_id' was expected but not found in first "
+ totalBytesRead + " bytes of response:\n"
+ HttpResponse.getStreamAsString(pushbackStream));
}
return pushbackStream;
}
/**
* Searches the peek end pattern into the given {@code sequence}. If it is matched,
* it also searches for the scroll ID and updates the state accordingly.
*
* @param sequence the String to search into
* @return {@code true} if the peek end pattern was matched or {@code false} otherwise
*/
private boolean matchScrollState(String sequence) {
Matcher peekEndMatcher = peekEndPattern.matcher(sequence);
if (peekEndMatcher.find()) {
Matcher scrollIdMatcher = SCROLL_ID_PATTERN.matcher(sequence);
if (!scrollIdMatcher.find()) {
scrollId = null;
} else {
scrollId = scrollIdMatcher.group(1);
isComplete = CLOSING_SQUARE_BRACKET.equals(peekEndMatcher.group(1));
return true;
}
}
return false;
}
}

View File

@ -5,16 +5,15 @@
*/ */
package org.elasticsearch.xpack.prelert.job.process.autodetect.writer; package org.elasticsearch.xpack.prelert.job.process.autodetect.writer;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.status.CountingInputStream; import org.elasticsearch.xpack.prelert.job.status.CountingInputStream;
import org.elasticsearch.xpack.prelert.job.status.StatusReporter; import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -31,7 +30,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
JsonParser parser = createParser(data); JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap(); Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
String record[] = new String[3]; String record[] = new String[3];
boolean gotFields[] = new boolean[3]; boolean gotFields[] = new boolean[3];
@ -58,7 +57,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
fieldMap.put("b", 1); fieldMap.put("b", 1);
fieldMap.put("c.e", 2); fieldMap.put("c.e", 2);
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
String record[] = new String[3]; String record[] = new String[3];
boolean gotFields[] = new boolean[3]; boolean gotFields[] = new boolean[3];
@ -80,7 +79,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
fieldMap.put("b", 1); fieldMap.put("b", 1);
fieldMap.put("c.e", 2); fieldMap.put("c.e", 2);
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
String record[] = new String[3]; String record[] = new String[3];
boolean gotFields[] = new boolean[3]; boolean gotFields[] = new boolean[3];
@ -102,7 +101,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
fieldMap.put("g", 1); fieldMap.put("g", 1);
fieldMap.put("c.e", 2); fieldMap.put("c.e", 2);
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
String record[] = new String[3]; String record[] = new String[3];
boolean gotFields[] = new boolean[3]; boolean gotFields[] = new boolean[3];
@ -127,7 +126,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
JsonParser parser = createParser(data); JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap(); Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
String record[] = new String[3]; String record[] = new String[3];
boolean gotFields[] = new boolean[3]; boolean gotFields[] = new boolean[3];
@ -149,7 +148,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
JsonParser parser = createParser(data); JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap(); Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
String record[] = new String[3]; String record[] = new String[3];
boolean gotFields[] = new boolean[3]; boolean gotFields[] = new boolean[3];
@ -177,7 +176,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
JsonParser parser = createParser(builder.toString()); JsonParser parser = createParser(builder.toString());
Map<String, Integer> fieldMap = createFieldMap(); Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
ESTestCase.expectThrows(ElasticsearchParseException.class, () -> readUntilError(reader)); ESTestCase.expectThrows(ElasticsearchParseException.class, () -> readUntilError(reader));
} }
@ -191,34 +190,6 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
} }
} }
public void testRead_GivenDataEmbeddedInSource() throws Exception {
String data = "{\"took\": 1,\"hits\":{\"total\":1,\"hits\":["
+ "{\"_index\":\"foo\",\"_source\":{\"a\":1,\"b\":2,\"c\":3}},"
+ "{\"_index\":\"foo\",\"_source\":{\"a\":4,\"b\":5,\"c\":6}}"
+ "]}}\n";
JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "_source", mock(Logger.class));
String record[] = new String[3];
boolean gotFields[] = new boolean[3];
assertEquals(3, reader.read(record, gotFields));
assertEquals("1", record[0]);
assertEquals("2", record[1]);
assertEquals("3", record[2]);
assertEquals(3, reader.read(record, gotFields));
assertEquals("4", record[0]);
assertEquals("5", record[1]);
assertEquals("6", record[2]);
assertEquals(-1, reader.read(record, gotFields));
}
public void testRead_givenControlCharacterInData() throws Exception { public void testRead_givenControlCharacterInData() throws Exception {
char controlChar = '\u0002'; char controlChar = '\u0002';
@ -229,7 +200,7 @@ public class SimpleJsonRecordReaderTests extends ESTestCase {
JsonParser parser = createParser(data); JsonParser parser = createParser(data);
Map<String, Integer> fieldMap = createFieldMap(); Map<String, Integer> fieldMap = createFieldMap();
SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, "", mock(Logger.class)); SimpleJsonRecordReader reader = new SimpleJsonRecordReader(parser, fieldMap, mock(Logger.class));
String record[] = new String[3]; String record[] = new String[3];
boolean gotFields[] = new boolean[3]; boolean gotFields[] = new boolean[3];

View File

@ -30,10 +30,10 @@ import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus; import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractorFactory;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractor;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractorFactory;
import org.junit.Before; import org.junit.Before;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -104,8 +104,12 @@ public class ScheduledJobRunnerTests extends ESTestCase {
when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture); when(client.execute(same(PostDataAction.INSTANCE), any())).thenReturn(jobDataFuture);
when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture); when(client.execute(same(FlushJobAction.INSTANCE), any())).thenReturn(flushJobFuture);
scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, dataExtractorFactory, scheduledJobRunner = new ScheduledJobRunner(threadPool, client, clusterService, jobProvider, () -> currentTime) {
() -> currentTime); @Override
DataExtractorFactory createDataExtractorFactory(SchedulerConfig schedulerConfig, Job job) {
return dataExtractorFactory;
}
};
when(jobProvider.audit(anyString())).thenReturn(auditor); when(jobProvider.audit(anyString())).thenReturn(auditor);
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
@ -131,7 +135,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
.build()); .build());
DataExtractor dataExtractor = mock(DataExtractor.class); DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(schedulerConfig, job)).thenReturn(dataExtractor); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
@ -140,7 +144,6 @@ public class ScheduledJobRunnerTests extends ESTestCase {
StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class); StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); verify(client).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
@ -164,7 +167,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
.build()); .build());
DataExtractor dataExtractor = mock(DataExtractor.class); DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(schedulerConfig, job)).thenReturn(dataExtractor); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
when(dataExtractor.next()).thenThrow(new RuntimeException("dummy")); when(dataExtractor.next()).thenThrow(new RuntimeException("dummy"));
when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts)); when(jobDataFuture.get()).thenReturn(new PostDataAction.Response(dataCounts));
@ -172,7 +175,6 @@ public class ScheduledJobRunnerTests extends ESTestCase {
StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class); StartSchedulerAction.SchedulerTask task = mock(StartSchedulerAction.SchedulerTask.class);
scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler); scheduledJobRunner.run("scheduler1", 0L, 60000L, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
verify(threadPool, never()).schedule(any(), any(), any()); verify(threadPool, never()).schedule(any(), any(), any());
verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo"))); verify(client, never()).execute(same(PostDataAction.INSTANCE), eq(new PostDataAction.Request("foo")));
@ -196,7 +198,7 @@ public class ScheduledJobRunnerTests extends ESTestCase {
.build()); .build());
DataExtractor dataExtractor = mock(DataExtractor.class); DataExtractor dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(schedulerConfig, job)).thenReturn(dataExtractor); when(dataExtractorFactory.newExtractor(0L, 60000L)).thenReturn(dataExtractor);
when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false);
InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8"))); InputStream in = new ByteArrayInputStream("".getBytes(Charset.forName("utf-8")));
when(dataExtractor.next()).thenReturn(Optional.of(in)); when(dataExtractor.next()).thenReturn(Optional.of(in));
@ -206,7 +208,6 @@ public class ScheduledJobRunnerTests extends ESTestCase {
StartSchedulerAction.SchedulerTask task = new StartSchedulerAction.SchedulerTask(1, "type", "action", null, "scheduler1"); StartSchedulerAction.SchedulerTask task = new StartSchedulerAction.SchedulerTask(1, "type", "action", null, "scheduler1");
scheduledJobRunner.run("scheduler1", 0L, null, task, handler); scheduledJobRunner.run("scheduler1", 0L, null, task, handler);
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME); verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULED_RUNNER_THREAD_POOL_NAME);
if (cancelled) { if (cancelled) {
task.stop(); task.stop();

View File

@ -12,7 +12,8 @@ import org.elasticsearch.xpack.prelert.action.FlushJobAction;
import org.elasticsearch.xpack.prelert.action.PostDataAction; import org.elasticsearch.xpack.prelert.action.PostDataAction;
import org.elasticsearch.xpack.prelert.job.DataCounts; import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.audit.Auditor; import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.extraction.DataExtractor; import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractor;
import org.elasticsearch.xpack.prelert.scheduler.extractor.DataExtractorFactory;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -25,6 +26,7 @@ import java.util.Optional;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -35,6 +37,7 @@ import static org.mockito.Mockito.when;
public class ScheduledJobTests extends ESTestCase { public class ScheduledJobTests extends ESTestCase {
private Auditor auditor; private Auditor auditor;
private DataExtractorFactory dataExtractorFactory;
private DataExtractor dataExtractor; private DataExtractor dataExtractor;
private Client client; private Client client;
private ActionFuture<FlushJobAction.Response> flushJobFuture; private ActionFuture<FlushJobAction.Response> flushJobFuture;
@ -45,7 +48,9 @@ public class ScheduledJobTests extends ESTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void setup() throws Exception { public void setup() throws Exception {
auditor = mock(Auditor.class); auditor = mock(Auditor.class);
dataExtractorFactory = mock(DataExtractorFactory.class);
dataExtractor = mock(DataExtractor.class); dataExtractor = mock(DataExtractor.class);
when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
client = mock(Client.class); client = mock(Client.class);
ActionFuture<PostDataAction.Response> jobDataFuture = mock(ActionFuture.class); ActionFuture<PostDataAction.Response> jobDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class);
@ -64,7 +69,7 @@ public class ScheduledJobTests extends ESTestCase {
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
assertNull(scheduledJob.runLookBack(0L, 1000L)); assertNull(scheduledJob.runLookBack(0L, 1000L));
verify(dataExtractor).newSearch(eq(0L), eq(1000L), any()); verify(dataExtractorFactory).newExtractor(0L, 1000L);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true); flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
@ -78,7 +83,7 @@ public class ScheduledJobTests extends ESTestCase {
long next = scheduledJob.runLookBack(0L, null); long next = scheduledJob.runLookBack(0L, null);
assertEquals(2000 + frequencyMs + 100, next); assertEquals(2000 + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(0L), eq(1500L), any()); verify(dataExtractorFactory).newExtractor(0L, 1500L);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true); flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
@ -100,7 +105,7 @@ public class ScheduledJobTests extends ESTestCase {
long next = scheduledJob.runLookBack(0L, null); long next = scheduledJob.runLookBack(0L, null);
assertEquals(10000 + frequencyMs + 100, next); assertEquals(10000 + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(5000 + 1L), eq(currentTime - queryDelayMs), any()); verify(dataExtractorFactory).newExtractor(5000 + 1L, currentTime - queryDelayMs);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true); flushRequest.setCalcInterim(true);
verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest));
@ -114,7 +119,7 @@ public class ScheduledJobTests extends ESTestCase {
long next = scheduledJob.runRealtime(); long next = scheduledJob.runRealtime();
assertEquals(currentTime + frequencyMs + 100, next); assertEquals(currentTime + frequencyMs + 100, next);
verify(dataExtractor).newSearch(eq(1000L + 1L), eq(currentTime - queryDelayMs), any()); verify(dataExtractorFactory).newExtractor(1000L + 1L, currentTime - queryDelayMs);
FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id"); FlushJobAction.Request flushRequest = new FlushJobAction.Request("_job_id");
flushRequest.setCalcInterim(true); flushRequest.setCalcInterim(true);
flushRequest.setAdvanceTime("1000"); flushRequest.setAdvanceTime("1000");
@ -122,7 +127,6 @@ public class ScheduledJobTests extends ESTestCase {
} }
public void testEmptyDataCount() throws Exception { public void testEmptyDataCount() throws Exception {
dataExtractor = mock(DataExtractor.class);
when(dataExtractor.hasNext()).thenReturn(false); when(dataExtractor.hasNext()).thenReturn(false);
ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1); ScheduledJob scheduledJob = createScheduledJob(1000, 500, -1, -1);
@ -130,7 +134,6 @@ public class ScheduledJobTests extends ESTestCase {
} }
public void testExtractionProblem() throws Exception { public void testExtractionProblem() throws Exception {
dataExtractor = mock(DataExtractor.class);
when(dataExtractor.hasNext()).thenReturn(true); when(dataExtractor.hasNext()).thenReturn(true);
when(dataExtractor.next()).thenThrow(new IOException()); when(dataExtractor.next()).thenThrow(new IOException());
@ -142,7 +145,7 @@ public class ScheduledJobTests extends ESTestCase {
ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class);
verify(dataExtractor, times(2)).newSearch(startTimeCaptor.capture(), endTimeCaptor.capture(), any()); verify(dataExtractorFactory, times(2)).newExtractor(startTimeCaptor.capture(), endTimeCaptor.capture());
assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue()); assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue());
assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue());
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
@ -162,7 +165,7 @@ public class ScheduledJobTests extends ESTestCase {
ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> startTimeCaptor = ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class); ArgumentCaptor<Long> endTimeCaptor = ArgumentCaptor.forClass(Long.class);
verify(dataExtractor, times(2)).newSearch(startTimeCaptor.capture(), endTimeCaptor.capture(), any()); verify(dataExtractorFactory, times(2)).newExtractor(startTimeCaptor.capture(), endTimeCaptor.capture());
assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue()); assertEquals(0L, startTimeCaptor.getAllValues().get(0).longValue());
assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue()); assertEquals(1000L, startTimeCaptor.getAllValues().get(1).longValue());
assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue()); assertEquals(1000L, endTimeCaptor.getAllValues().get(0).longValue());
@ -173,7 +176,7 @@ public class ScheduledJobTests extends ESTestCase {
private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, private ScheduledJob createScheduledJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs) { long latestRecordTimeMs) {
Supplier<Long> currentTimeSupplier = () -> currentTime; Supplier<Long> currentTimeSupplier = () -> currentTime;
return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractor, client, auditor, return new ScheduledJob("_job_id", frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs); currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs);
} }

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class SearchHitFieldExtractorTests extends ESTestCase {
public void testExtractTimeFieldGivenHitContainsNothing() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time"));
}
public void testExtractTimeFieldGivenSingleValueInFields() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L)));
searchHit.fields(fields);
assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(3L));
}
public void testExtractTimeFieldGivenSingleValueInSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
searchHit.sourceRef(new BytesArray("{\"time\":1482418307000}"));
assertThat(SearchHitFieldExtractor.extractTimeField(searchHit, "time"), equalTo(1482418307000L));
}
public void testExtractTimeFieldGivenArrayValue() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("time", new InternalSearchHitField("time", Arrays.asList(3L, 5L)));
searchHit.fields(fields);
expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time"));
}
public void testExtractTimeFieldGivenSingleNonLongValue() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("time", new InternalSearchHitField("time", Arrays.asList(3)));
searchHit.fields(fields);
expectThrows(RuntimeException.class, () -> SearchHitFieldExtractor.extractTimeField(searchHit, "time"));
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class SearchHitToJsonProcessorTests extends ESTestCase {
public void testProcessGivenHitContainsNothing() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit);
assertThat(json, equalTo("{}"));
}
public void testProcessGivenHitContainsEmptySource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
searchHit.sourceRef(new BytesArray("{}"));
String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit);
assertThat(json, equalTo("{}"));
}
public void testProcessGivenHitContainsSingleValueInFields() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3)));
searchHit.fields(fields);
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":3}"));
}
public void testProcessGivenHitContainsArrayValueInFields() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3, 9)));
searchHit.fields(fields);
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":[3,9]}"));
}
public void testProcessGivenHitContainsSingleValueInSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String hitSource = "{\"field_1\":\"foo\"}";
searchHit.sourceRef(new BytesArray(hitSource));
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":\"foo\"}"));
}
public void testProcessGivenHitContainsArrayValueInSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String hitSource = "{\"field_1\":[\"foo\",\"bar\"]}";
searchHit.sourceRef(new BytesArray(hitSource));
String json = searchHitToString(new String[] {"field_1"}, searchHit);
assertThat(json, equalTo("{\"field_1\":[\"foo\",\"bar\"]}"));
}
public void testProcessGivenHitContainsFieldsAndSource() throws IOException {
InternalSearchHit searchHit = new InternalSearchHit(42);
String hitSource = "{\"field_1\":\"foo\"}";
searchHit.sourceRef(new BytesArray(hitSource));
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList("bar")));
searchHit.fields(fields);
String json = searchHitToString(new String[] {"field_1", "field_2"}, searchHit);
assertThat(json, equalTo("{\"field_1\":\"foo\",\"field_2\":\"bar\"}"));
}
public void testProcessGivenMultipleHits() throws IOException {
InternalSearchHit searchHit1 = new InternalSearchHit(42);
Map<String, SearchHitField> fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(3)));
searchHit1.fields(fields);
InternalSearchHit searchHit2 = new InternalSearchHit(42);
fields = new HashMap<>();
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(5)));
searchHit2.fields(fields);
String json = searchHitToString(new String[] {"field_1"}, searchHit1, searchHit2);
assertThat(json, equalTo("{\"field_1\":3} {\"field_1\":5}"));
}
private String searchHitToString(String[] fields, SearchHit... searchHits) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(fields, outputStream)) {
for (int i = 0; i < searchHits.length; i++) {
hitProcessor.process(searchHits[i]);
}
}
return outputStream.toString(StandardCharsets.UTF_8.name());
}
}

View File

@ -0,0 +1,295 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.extractor.scroll;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ScrollDataExtractorTests extends ESTestCase {
private Client client;
private List<SearchRequestBuilder> capturedSearchRequests;
private List<String> capturedContinueScrollIds;
private List<String> capturedClearScrollIds;
private String jobId;
private List<String> jobFields;
private String timeField;
private List<String> types;
private List<String> indexes;
private QueryBuilder query;
private AggregatorFactories.Builder aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private int scrollSize;
private class TestDataExtractor extends ScrollDataExtractor {
private SearchResponse nextResponse;
public TestDataExtractor(long start, long end) {
super(client, createContext(start, end));
}
@Override
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
capturedSearchRequests.add(searchRequestBuilder);
return nextResponse;
}
@Override
protected SearchResponse executeSearchScrollRequest(String scrollId) {
capturedContinueScrollIds.add(scrollId);
return nextResponse;
}
@Override
void clearScroll(String scrollId) {
capturedClearScrollIds.add(scrollId);
}
void setNextResponse(SearchResponse searchResponse) {
nextResponse = searchResponse;
}
}
@Before
public void setUpTests() {
client = mock(Client.class);
capturedSearchRequests = new ArrayList<>();
capturedContinueScrollIds = new ArrayList<>();
capturedClearScrollIds = new ArrayList<>();
timeField = "time";
jobId = "test-job";
jobFields = Arrays.asList(timeField, "field_1");
indexes = Arrays.asList("index-1", "index-2");
types = Arrays.asList("type-1", "type-2");
query = QueryBuilders.matchAllQuery();
aggregations = null;
scriptFields = Collections.emptyList();
scrollSize = 1000;
}
public void testSinglePageExtraction() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
SearchResponse response = createSearchResponse(
Arrays.asList(1100L, 1200L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
Optional<InputStream> stream = extractor.next();
assertThat(stream.isPresent(), is(true));
String expectedStream = "{\"time\":1100,\"field_1\":\"a1\"} {\"time\":1200,\"field_1\":\"a2\"}";
assertThat(asString(stream.get()), equalTo(expectedStream));
extractor.setNextResponse(createEmptySearchResponse());
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
assertThat(capturedSearchRequests.size(), equalTo(1));
String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"size\":1000"));
assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
"{\"range\":{\"time\":{\"from\":1000,\"to\":2000,\"include_lower\":true,\"include_upper\":false," +
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
assertThat(searchRequest, containsString("\"sort\":[{\"time\":{\"order\":\"asc\"}}]"));
assertThat(capturedContinueScrollIds.size(), equalTo(1));
assertThat(capturedContinueScrollIds.get(0), equalTo(response.getScrollId()));
assertThat(capturedClearScrollIds.isEmpty(), is(true));
}
public void testMultiplePageExtraction() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 10000L);
SearchResponse response1 = createSearchResponse(
Arrays.asList(1000L, 2000L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
extractor.setNextResponse(response1);
assertThat(extractor.hasNext(), is(true));
Optional<InputStream> stream = extractor.next();
assertThat(stream.isPresent(), is(true));
String expectedStream = "{\"time\":1000,\"field_1\":\"a1\"} {\"time\":2000,\"field_1\":\"a2\"}";
assertThat(asString(stream.get()), equalTo(expectedStream));
SearchResponse response2 = createSearchResponse(
Arrays.asList(3000L, 4000L),
Arrays.asList("a3", "a4"),
Arrays.asList("b3", "b4")
);
extractor.setNextResponse(response2);
assertThat(extractor.hasNext(), is(true));
stream = extractor.next();
assertThat(stream.isPresent(), is(true));
expectedStream = "{\"time\":3000,\"field_1\":\"a3\"} {\"time\":4000,\"field_1\":\"a4\"}";
assertThat(asString(stream.get()), equalTo(expectedStream));
extractor.setNextResponse(createEmptySearchResponse());
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
assertThat(capturedSearchRequests.size(), equalTo(1));
String searchRequest1 = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
assertThat(searchRequest1, containsString("\"size\":1000"));
assertThat(searchRequest1, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
"{\"range\":{\"time\":{\"from\":1000,\"to\":10000,\"include_lower\":true,\"include_upper\":false," +
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
assertThat(searchRequest1, containsString("\"sort\":[{\"time\":{\"order\":\"asc\"}}]"));
assertThat(capturedContinueScrollIds.size(), equalTo(2));
assertThat(capturedContinueScrollIds.get(0), equalTo(response1.getScrollId()));
assertThat(capturedContinueScrollIds.get(1), equalTo(response2.getScrollId()));
assertThat(capturedClearScrollIds.isEmpty(), is(true));
}
public void testMultiplePageExtractionGivenCancel() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 10000L);
SearchResponse response1 = createSearchResponse(
Arrays.asList(1000L, 2000L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
extractor.setNextResponse(response1);
assertThat(extractor.hasNext(), is(true));
Optional<InputStream> stream = extractor.next();
assertThat(stream.isPresent(), is(true));
String expectedStream = "{\"time\":1000,\"field_1\":\"a1\"} {\"time\":2000,\"field_1\":\"a2\"}";
assertThat(asString(stream.get()), equalTo(expectedStream));
extractor.cancel();
SearchResponse response2 = createSearchResponse(
Arrays.asList(2000L, 3000L),
Arrays.asList("a3", "a4"),
Arrays.asList("b3", "b4")
);
extractor.setNextResponse(response2);
assertThat(extractor.hasNext(), is(true));
stream = extractor.next();
assertThat(stream.isPresent(), is(true));
expectedStream = "{\"time\":2000,\"field_1\":\"a3\"}";
assertThat(asString(stream.get()), equalTo(expectedStream));
assertThat(extractor.hasNext(), is(false));
assertThat(capturedClearScrollIds.size(), equalTo(1));
assertThat(capturedClearScrollIds.get(0), equalTo(response2.getScrollId()));
}
public void testExtractionGivenInitSearchResponseHasError() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createErrorResponse());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
}
public void testExtractionGivenContinueScrollResponseHasError() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 10000L);
SearchResponse response1 = createSearchResponse(
Arrays.asList(1000L, 2000L),
Arrays.asList("a1", "a2"),
Arrays.asList("b1", "b2")
);
extractor.setNextResponse(response1);
assertThat(extractor.hasNext(), is(true));
Optional<InputStream> stream = extractor.next();
assertThat(stream.isPresent(), is(true));
extractor.setNextResponse(createErrorResponse());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
}
private ScrollDataExtractorContext createContext(long start, long end) {
return new ScrollDataExtractorContext(jobId, jobFields, timeField, indexes, types, query, aggregations, scriptFields, scrollSize,
start, end);
}
private SearchResponse createEmptySearchResponse() {
return createSearchResponse(Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}
private SearchResponse createSearchResponse(List<Long> timestamps, List<String> field1Values, List<String> field2Values) {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getScrollId()).thenReturn(randomAsciiOfLength(1000));
SearchHits searchHits = mock(SearchHits.class);
List<SearchHit> hits = new ArrayList<>();
for (int i = 0; i < timestamps.size(); i++) {
InternalSearchHit hit = new InternalSearchHit(randomInt());
Map<String, SearchHitField> fields = new HashMap<>();
fields.put(timeField, new InternalSearchHitField("time", Arrays.asList(timestamps.get(i))));
fields.put("field_1", new InternalSearchHitField("field_1", Arrays.asList(field1Values.get(i))));
fields.put("field_2", new InternalSearchHitField("field_2", Arrays.asList(field2Values.get(i))));
hit.fields(fields);
hits.add(hit);
}
when(searchHits.getHits()).thenReturn(hits.toArray(new SearchHit[hits.size()]));
when(searchHits.hits()).thenReturn(hits.toArray(new SearchHit[hits.size()]));
when(searchResponse.getHits()).thenReturn(searchHits);
return searchResponse;
}
private SearchResponse createErrorResponse() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR);
return searchResponse;
}
private static String asString(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
}

View File

@ -1,693 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import static org.mockito.Mockito.mock;
public class ElasticsearchDataExtractorTests extends ESTestCase {
private static final List<String> INDEXES = Arrays.asList("index-*");
private static final List<String> TYPES = Arrays.asList("dataType");
private static final String SEARCH = "{\"match_all\":{}}";
private static final String TIME_FIELD = "time";
private static final String CLEAR_SCROLL_RESPONSE = "{}";
private Logger jobLogger;
private String aggregations;
private String scriptFields;
private ElasticsearchDataExtractor extractor;
@Before
public void setUpTests() throws IOException {
jobLogger = mock(Logger.class);
}
public void testDataExtraction() throws IOException {
String initialResponse = "{" + "\"_scroll_id\":\"c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1\"," + "\"took\":17,"
+ "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "},"
+ "\"hits\":{" + " \"total\":1437," + " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\","
+ " \"_type\":\"dataType\"," + " \"_id\":\"1403481600\"," + " \"_score\":null," + " \"_source\":{"
+ " \"id\":\"1403481600\"" + " }" + " ]" + "}" + "}";
String scrollResponse = "{" + "\"_scroll_id\":\"secondScrollId\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403782200\"," + " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403782200\"" + " }"
+ " ]" + "}" + "}";
String scrollEndResponse = "{" + "\"_scroll_id\":\"thirdScrollId\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437,"
+ " \"max_score\":null," + " \"hits\":[]" + "}" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 200),
new HttpResponse(toStream(scrollResponse), 200), new HttpResponse(toStream(scrollEndResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
assertEquals(initialResponse, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertEquals(scrollResponse, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
requester.assertEqualRequestsToResponses();
requester.assertResponsesHaveBeenConsumed();
RequestParams firstRequestParams = requester.getGetRequestParams(0);
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000", firstRequestParams.url);
String expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {"
+ " \"bool\": {" + " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"time\": {" + " \"gte\": \"1970-01-17T04:53:20.000Z\","
+ " \"lt\": \"1970-01-17T05:53:20.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }" + "}";
assertEquals(expectedSearchBody.replaceAll(" ", ""), firstRequestParams.requestBody.replaceAll(" ", ""));
RequestParams secondRequestParams = requester.getGetRequestParams(1);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", secondRequestParams.url);
assertEquals("c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1", secondRequestParams.requestBody);
RequestParams thirdRequestParams = requester.getGetRequestParams(2);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", thirdRequestParams.url);
assertEquals("secondScrollId", thirdRequestParams.requestBody);
assertEquals("http://localhost:9200/_search/scroll", requester.getDeleteRequestParams(0).url);
assertEquals("{\"scroll_id\":[\"thirdScrollId\"]}", requester.getDeleteRequestParams(0).requestBody);
assertEquals(1, requester.deleteRequestParams.size());
}
public void testDataExtraction_GivenInitialResponseContainsLongScrollId() throws IOException {
StringBuilder scrollId = new StringBuilder();
for (int i = 0; i < 300 * 1024; i++) {
scrollId.append("a");
}
String initialResponse = "{" + "\"_scroll_id\":\"" + scrollId + "\"," + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403481600\"," + " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403481600\"" + " }"
+ " ]" + "}" + "}";
String scrollEndResponse = "{" + "\"_scroll_id\":\"" + scrollId + "\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437,"
+ " \"max_score\":null," + " \"hits\":[]" + "}" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 200),
new HttpResponse(toStream(scrollEndResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
extractor.next();
assertTrue(extractor.hasNext());
extractor.next();
assertFalse(extractor.hasNext());
assertEquals(scrollId.toString(), requester.getGetRequestParams(1).requestBody);
}
public void testDataExtraction_GivenInitialResponseContainsNoHitsAndNoScrollId() throws IOException {
String initialResponse = "{}";
HttpResponse httpGetResponse = new HttpResponse(toStream(initialResponse), 200);
List<HttpResponse> responses = Arrays.asList(httpGetResponse);
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertEquals("Field '_scroll_id' was expected but not found in first 2 bytes of response:\n{}", e.getMessage());
}
public void testDataExtraction_GivenInitialResponseContainsHitsButNoScrollId() throws IOException {
String initialResponse = "{" + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1," + " \"successful\":1,"
+ " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437," + " \"max_score\":null," + " \"hits\":["
+ " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\"," + " \"_id\":\"1403481600\"," + " \"_score\":null,"
+ " \"_source\":{" + " \"id\":\"1403481600\"" + " }" + " ]" + "}" + "}";
HttpResponse httpGetResponse = new HttpResponse(toStream(initialResponse), 200);
List<HttpResponse> responses = Arrays.asList(httpGetResponse);
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertEquals("Field '_scroll_id' was expected but not found in first 272 bytes of response:\n" + initialResponse, e.getMessage());
}
public void testDataExtraction_GivenInitialResponseContainsTooLongScrollId() throws IOException {
StringBuilder scrollId = new StringBuilder();
for (int i = 0; i < 1024 * 1024; i++) {
scrollId.append("a");
}
String initialResponse = "{" + "\"_scroll_id\":\"" + scrollId + "\"," + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1437,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403481600\"," + " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403481600\"" + " }"
+ " ]" + "}" + "}";
HttpResponse httpGetResponse = new HttpResponse(toStream(initialResponse), 200);
List<HttpResponse> responses = Arrays.asList(httpGetResponse);
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertEquals("Field '_scroll_id' was expected but not found in first 1048576 bytes of response:\n" + initialResponse,
e.getMessage());
}
public void testDataExtraction_GivenInitialResponseDoesNotReturnOk() throws IOException {
String initialResponse = "{}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 500));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertEquals(
"Request 'http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000' failed with status code: 500."
+ " Response was:\n{}",
e.getMessage());
}
public void testDataExtraction_GivenScrollResponseDoesNotReturnOk() throws IOException {
String initialResponse = "{" + "\"_scroll_id\":\"c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1\"," + "\"hits\":[..." + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 200),
new HttpResponse(toStream("{}"), 500));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
assertEquals(initialResponse, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
IOException e = expectThrows(IOException.class, () -> extractor.next());
assertEquals("Request 'http://localhost:9200/_search/scroll?scroll=60m' with scroll id 'c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1' "
+ "failed with status code: 500. Response was:\n{}", e.getMessage());
}
public void testNext_ThrowsGivenHasNotNext() throws IOException {
String initialResponse = "{" + "\"_scroll_id\":\"c2Nhbjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1\"," + "\"hits\":[]" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
expectThrows(NoSuchElementException.class, () -> extractor.next());
}
public void testDataExtractionWithAggregations() throws IOException {
aggregations = "{\"my-aggs\": {\"terms\":{\"field\":\"foo\"}}}";
String initialResponse = "{" + "\"_scroll_id\":\"r2d2bjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1\"," + "\"took\":17,"
+ "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "},"
+ "\"aggregations\":{" + " \"my-aggs\":{" + " \"buckets\":[" + " {" + " \"key\":\"foo\"" + " }"
+ " ]" + " }" + "}" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
assertEquals(initialResponse, streamToString(extractor.next().get()));
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
requester.assertEqualRequestsToResponses();
requester.assertResponsesHaveBeenConsumed();
assertEquals(1, requester.getRequestParams.size());
RequestParams requestParams = requester.getGetRequestParams(0);
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=0", requestParams.url);
String expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {"
+ " \"bool\": {" + " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"time\": {" + " \"gte\": \"1970-01-17T04:53:20.000Z\","
+ " \"lt\": \"1970-01-17T05:53:20.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"aggs\":{\"my-aggs\": {\"terms\":{\"field\":\"foo\"}}}"
+ "}";
assertEquals(expectedSearchBody.replaceAll(" ", ""), requestParams.requestBody.replaceAll(" ", ""));
assertEquals("http://localhost:9200/_search/scroll", requester.getDeleteRequestParams(0).url);
assertEquals("{\"scroll_id\":[\"r2d2bjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1\"]}", requester.getDeleteRequestParams(0).requestBody);
assertEquals(1, requester.deleteRequestParams.size());
}
public void testDataExtractionWithAggregations_GivenResponseHasEmptyBuckets() throws IOException {
aggregations = "{\"aggs\":{\"my-aggs\": {\"terms\":{\"field\":\"foo\"}}}}";
String initialResponse = "{" + "\"_scroll_id\":\"r2d2bjs2OzM0NDg1ODpzRlBLc0FXNlNyNm5JWUc1\"," + "\"took\":17,"
+ "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "},"
+ "\"aggregations\":{" + " \"my-aggs\":{" + " \"buckets\":[]" + " }" + "}" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(initialResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000L, 1403600000L, jobLogger);
assertTrue(extractor.hasNext());
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
requester.assertEqualRequestsToResponses();
requester.assertResponsesHaveBeenConsumed();
assertEquals(1, requester.getRequestParams.size());
RequestParams requestParams = requester.getGetRequestParams(0);
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=0", requestParams.url);
}
public void testChunkedDataExtraction() throws IOException {
String dataSummaryResponse = "{" + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1,"
+ " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000," + " \"max_score\":null,"
+ " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\"," + " \"_id\":\"1403481600\","
+ " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403481600\"" + " }" + " ]" + "},"
+ "\"aggregations\":{" + "\"earliestTime\":{" + "\"value\":1400000001000," + "\"value_as_string\":\"2014-05-13T16:53:21Z\""
+ "}," + "\"latestTime\":{" + "\"value\":1400007201000," + "\"value_as_string\":\"2014-05-13T17:16:01Z\"" + "}" + "}" + "}";
String indexResponse = "{" + "\"dataIndex\":{" + " \"settings\":{" + " \"index\":{" + " \"creation_date\":0,"
+ " \"number_of_shards\":\"5\"," + " \"number_of_replicas\":\"1\"" + " }" + " }" + "}";
String initialResponse1 = "{" + "\"_scroll_id\":\"scrollId_1\"," + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403481600\"," + " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403481600\"" + " }"
+ " ]" + "}" + "}";
String continueResponse1 = "{" + "\"_scroll_id\":\"scrollId_2\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403782200\"," + " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403782200\"" + " }"
+ " ]" + "}" + "}";
String endResponse1 = "{" + "\"_scroll_id\":\"scrollId_3\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000,"
+ " \"max_score\":null," + " \"hits\":[]" + "}" + "}";
String initialResponse2 = "{" + "\"_scroll_id\":\"scrollId_4\"," + "\"hits\":{" + " \"total\":10000," + " \"hits\":["
+ " \"_index\":\"dataIndex\"" + " ]" + "}" + "}";
String endResponse2 = "{" + "\"_scroll_id\":\"scrollId_5\"," + "\"hits\":[]" + "}";
String initialResponse3 = "{" + "\"_scroll_id\":\"scrollId_6\"," + "\"hits\":[]" + "}";
String dataSummaryResponse2 = "{" + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1,"
+ " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":1," + " \"max_score\":null,"
+ " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\"," + " \"_id\":\"1403481600\","
+ " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403481600\"" + " }" + " ]" + "},"
+ "\"aggregations\":{" + "\"earliestTime\":{" + "\"value\":1400007201000," + "\"value_as_string\":\"2014-05-13T17:16:01Z\""
+ "}," + "\"latestTime\":{" + "\"value\":1400007201000," + "\"value_as_string\":\"2014-05-13T17:16:01Z\"" + "}" + "}" + "}";
String initialResponse4 = "{" + "\"_scroll_id\":\"scrollId_7\"," + "\"hits\":{" + " \"total\":1," + " \"hits\":["
+ " \"_index\":\"dataIndex\"" + " ]" + "}" + "}";
String endResponse4 = "{" + "\"_scroll_id\":\"scrollId_8\"," + "\"hits\":[]" + "}";
String response5 = "{" + "\"_scroll_id\":\"scrollId_9\"," + "\"hits\":{" + " \"total\":0," + " \"hits\":[]" + "}" + "}";
String finalDataSummaryResponse = "{" + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{" + " \"total\":0,"
+ " \"successful\":0," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":0," + " \"max_score\":null,"
+ " \"hits\":[]" + "}," + "\"aggregations\":{" + "\"earliestTime\":{" + "\"value\": null" + "}," + "\"latestTime\":{"
+ "\"value\": null" + "}" + "}" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(dataSummaryResponse), 200),
new HttpResponse(toStream(indexResponse), 200), new HttpResponse(toStream(initialResponse1), 200),
new HttpResponse(toStream(continueResponse1), 200), new HttpResponse(toStream(endResponse1), 200),
new HttpResponse(toStream(initialResponse2), 200), new HttpResponse(toStream(endResponse2), 200),
new HttpResponse(toStream(initialResponse3), 200), new HttpResponse(toStream(dataSummaryResponse2), 200),
new HttpResponse(toStream(indexResponse), 200), new HttpResponse(toStream(initialResponse4), 200),
new HttpResponse(toStream(endResponse4), 200), new HttpResponse(toStream(response5), 200),
new HttpResponse(toStream(finalDataSummaryResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000000L, 1407200000000L, jobLogger);
assertTrue(extractor.hasNext());
assertEquals(initialResponse1, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertEquals(continueResponse1, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertEquals(initialResponse2, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertEquals(initialResponse4, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
requester.assertEqualRequestsToResponses();
requester.assertResponsesHaveBeenConsumed();
int requestCount = 0;
RequestParams requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/index-*/dataType/_search?size=1", requestParams.url);
String expectedDataSummaryBody = "{" + " \"sort\": [{\"_doc\":{\"order\":\"asc\"}}]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2014-05-13T16:53:20.000Z\"," + " \"lt\": \"2014-08-05T00:53:20.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " },"
+ " \"aggs\":{" + " \"earliestTime\":{" + " \"min\":{\"field\":\"time\"}" + " }," + " \"latestTime\":{"
+ " \"max\":{\"field\":\"time\"}" + " }" + " }" + "}";
assertEquals(expectedDataSummaryBody.replace(" ", ""), requestParams.requestBody.replace(" ", ""));
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/dataIndex/_settings", requestParams.url);
assertNull(requestParams.requestBody);
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000", requestParams.url);
String expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {"
+ " \"bool\": {" + " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"time\": {" + " \"gte\": \"2014-05-13T16:53:21.000Z\","
+ " \"lt\": \"2014-05-13T17:53:21.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }" + "}";
assertEquals(expectedSearchBody.replaceAll(" ", ""), requestParams.requestBody.replaceAll(" ", ""));
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", requestParams.url);
assertEquals("scrollId_1", requestParams.requestBody);
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", requestParams.url);
assertEquals("scrollId_2", requestParams.requestBody);
requestParams = requester.getGetRequestParams(requestCount++);
expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2014-05-13T17:53:21.000Z\"," + " \"lt\": \"2014-05-13T18:53:21.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " }"
+ "}";
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000", requestParams.url);
assertEquals(expectedSearchBody.replaceAll(" ", ""), requestParams.requestBody.replaceAll(" ", ""));
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", requestParams.url);
assertEquals("scrollId_4", requestParams.requestBody);
requestParams = requester.getGetRequestParams(requestCount++);
expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2014-05-13T18:53:21.000Z\"," + " \"lt\": \"2014-05-13T19:53:21.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " }"
+ "}";
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000", requestParams.url);
assertEquals(expectedSearchBody.replaceAll(" ", ""), requestParams.requestBody.replaceAll(" ", ""));
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/index-*/dataType/_search?size=1", requestParams.url);
expectedDataSummaryBody = "{" + " \"sort\": [{\"_doc\":{\"order\":\"asc\"}}]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2014-05-13T19:53:21.000Z\"," + " \"lt\": \"2014-08-05T00:53:20.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " },"
+ " \"aggs\":{" + " \"earliestTime\":{" + " \"min\":{\"field\":\"time\"}" + " }," + " \"latestTime\":{"
+ " \"max\":{\"field\":\"time\"}" + " }" + " }" + "}";
assertEquals(expectedDataSummaryBody.replace(" ", ""), requestParams.requestBody.replace(" ", ""));
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/dataIndex/_settings", requestParams.url);
assertNull(requestParams.requestBody);
requestParams = requester.getGetRequestParams(requestCount++);
expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2014-05-13T18:53:21.000Z\"," + " \"lt\": \"2014-05-13T18:53:31.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " }"
+ "}";
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000", requestParams.url);
assertEquals(expectedSearchBody.replaceAll(" ", ""), requestParams.requestBody.replaceAll(" ", ""));
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", requestParams.url);
assertEquals("scrollId_7", requestParams.requestBody);
requestParams = requester.getGetRequestParams(requestCount++);
expectedSearchBody = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2014-05-13T18:53:31.000Z\"," + " \"lt\": \"2014-05-13T18:53:41.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " }"
+ "}";
assertEquals("http://localhost:9200/index-*/dataType/_search?scroll=60m&size=1000", requestParams.url);
assertEquals(expectedSearchBody.replaceAll(" ", ""), requestParams.requestBody.replaceAll(" ", ""));
requestParams = requester.getGetRequestParams(requestCount++);
assertEquals("http://localhost:9200/index-*/dataType/_search?size=1", requestParams.url);
expectedDataSummaryBody = "{" + " \"sort\": [{\"_doc\":{\"order\":\"asc\"}}]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2014-05-13T18:53:41.000Z\"," + " \"lt\": \"2014-08-05T00:53:20.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " },"
+ " \"aggs\":{" + " \"earliestTime\":{" + " \"min\":{\"field\":\"time\"}" + " }," + " \"latestTime\":{"
+ " \"max\":{\"field\":\"time\"}" + " }" + " }" + "}";
assertEquals(expectedDataSummaryBody.replace(" ", ""), requestParams.requestBody.replace(" ", ""));
assertEquals(requestCount, requester.requestCount);
String[] deletedScrollIds = { "scrollId_3", "scrollId_5", "scrollId_6", "scrollId_8", "scrollId_9" };
assertEquals(5, requester.deleteRequestParams.size());
for (int i = 0; i < deletedScrollIds.length; i++) {
assertEquals("http://localhost:9200/_search/scroll", requester.getDeleteRequestParams(i).url);
assertEquals(String.format(Locale.ROOT, "{\"scroll_id\":[\"%s\"]}", deletedScrollIds[i]),
requester.getDeleteRequestParams(i).requestBody);
}
}
public void testChunkedDataExtraction_GivenZeroHits() throws IOException {
String dataSummaryResponse = "{" + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1,"
+ " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":0," + " \"max_score\":null,"
+ " \"hits\":[]" + "}," + "\"aggregations\":{" + "\"earliestTime\":null," + "\"latestTime\":null" + "}" + "}";
String searchResponse = "{" + "\"_scroll_id\":\"scrollId_1\"," + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":0,"
+ " \"max_score\":null," + " \"hits\":[]" + "}" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(dataSummaryResponse), 200),
new HttpResponse(toStream(searchResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000000L, 1407200000000L, jobLogger);
assertTrue(extractor.hasNext());
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
}
public void testChunkedDataExtraction_GivenDataSummaryRequestIsNotOk() throws IOException {
String dataSummaryResponse = "{}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(dataSummaryResponse), 400));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
IOException e = expectThrows(IOException.class, () -> extractor.newSearch(1400000000000L, 1407200000000L, jobLogger));
assertEquals("Request 'http://localhost:9200/index-*/dataType/_search?size=1' " + "failed with status code: 400. Response was:\n"
+ dataSummaryResponse, e.getMessage());
}
public void testChunkedDataExtraction_GivenEmptyDataSummaryResponse() throws IOException {
String dataSummaryResponse = "{}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(dataSummaryResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
IOException e = expectThrows(IOException.class, () -> extractor.newSearch(1400000000000L, 1407200000000L, jobLogger));
assertEquals("Failed to parse string from pattern '\"hits\":\\{.*?\"total\":(.*?),'. Response was:\n" + dataSummaryResponse,
e.getMessage());
}
public void testChunkedDataExtraction_GivenTotalHitsCannotBeParsed() throws IOException {
String dataSummaryResponse = "{" + "\"hits\":{" + " \"total\":\"NaN\"," + " \"max_score\":null," + " \"hits\":[]" + "}," + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(dataSummaryResponse), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
IOException e = expectThrows(IOException.class, () -> extractor.newSearch(1400000000000L, 1407200000000L, jobLogger));
assertEquals("Failed to parse long from pattern '\"hits\":\\{.*?\"total\":(.*?),'. Response was:\n" + dataSummaryResponse,
e.getMessage());
}
public void testCancel_GivenChunked() throws IOException {
String dataSummaryResponse = "{" + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{" + " \"total\":1,"
+ " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000," + " \"max_score\":null,"
+ " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\"," + " \"_id\":\"1403481600\","
+ " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403481600\"" + " }" + " ]" + "},"
+ "\"aggregations\":{" + "\"earliestTime\":{" + "\"value\":1400000001000," + "\"value_as_string\":\"2014-05-13T16:53:21Z\""
+ "}," + "\"latestTime\":{" + "\"value\":1400007201000," + "\"value_as_string\":\"2014-05-13T17:16:01Z\"" + "}" + "}" + "}";
String indexResponse = "{" + "\"dataIndex\":{" + " \"settings\":{" + " \"index\":{" + " \"creation_date\":0,"
+ " \"number_of_shards\":\"5\"," + " \"number_of_replicas\":\"1\"" + " }" + " }" + "}";
String initialResponse1 = "{" + "\"_scroll_id\":\"scrollId_1\"," + "\"took\":17," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403481600\"," + " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403481600\"" + " }"
+ " ]" + "}" + "}";
String continueResponse1 = "{" + "\"_scroll_id\":\"scrollId_2\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000,"
+ " \"max_score\":null," + " \"hits\":[" + " \"_index\":\"dataIndex\"," + " \"_type\":\"dataType\","
+ " \"_id\":\"1403782200\"," + " \"_score\":null," + " \"_source\":{" + " \"id\":\"1403782200\"" + " }"
+ " ]" + "}" + "}";
String endResponse1 = "{" + "\"_scroll_id\":\"scrollId_3\"," + "\"took\":8," + "\"timed_out\":false," + "\"_shards\":{"
+ " \"total\":1," + " \"successful\":1," + " \"failed\":0" + "}," + "\"hits\":{" + " \"total\":10000,"
+ " \"max_score\":null," + " \"hits\":[]" + "}" + "}";
String initialResponse2 = "{" + "\"_scroll_id\":\"scrollId_4\"," + "\"hits\":{" + " \"total\":10000," + " \"hits\":["
+ " \"_index\":\"dataIndex\"" + " ]" + "}" + "}";
String endResponse2 = "{" + "\"_scroll_id\":\"scrollId_5\"," + "\"hits\":[]" + "}";
List<HttpResponse> responses = Arrays.asList(new HttpResponse(toStream(dataSummaryResponse), 200),
new HttpResponse(toStream(indexResponse), 200), new HttpResponse(toStream(initialResponse1), 200),
new HttpResponse(toStream(continueResponse1), 200), new HttpResponse(toStream(endResponse1), 200),
new HttpResponse(toStream(initialResponse2), 200), new HttpResponse(toStream(endResponse2), 200));
MockHttpRequester requester = new MockHttpRequester(responses);
createExtractor(requester);
extractor.newSearch(1400000000000L, 1407200000000L, jobLogger);
extractor.cancel();
assertTrue(extractor.hasNext());
assertEquals(initialResponse1, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertEquals(continueResponse1, streamToString(extractor.next().get()));
assertTrue(extractor.hasNext());
assertFalse(extractor.next().isPresent());
assertFalse(extractor.hasNext());
assertEquals("http://localhost:9200/_search/scroll", requester.getDeleteRequestParams(0).url);
assertEquals("{\"scroll_id\":[\"scrollId_3\"]}", requester.getDeleteRequestParams(0).requestBody);
assertEquals(1, requester.deleteRequestParams.size());
extractor.newSearch(1407200000000L, 1407203600000L, jobLogger);
assertTrue(extractor.hasNext());
}
private static InputStream toStream(String input) {
return new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
}
private static String streamToString(InputStream stream) throws IOException {
try (BufferedReader buffer = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
return buffer.lines().collect(Collectors.joining("\n")).trim();
}
}
private void createExtractor(MockHttpRequester httpRequester) {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(SEARCH, aggregations, scriptFields, TIME_FIELD);
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder.create(INDEXES, TYPES);
extractor = new ElasticsearchDataExtractor(httpRequester, urlBuilder, queryBuilder, 1000);
}
private static class MockHttpRequester extends HttpRequester {
private List<HttpResponse> getResponses;
private List<HttpResponse> deleteResponses;
private int requestCount = 0;
private List<RequestParams> getRequestParams;
private List<RequestParams> deleteRequestParams;
public MockHttpRequester(List<HttpResponse> responses) {
getResponses = responses;
deleteResponses = new ArrayList<>();
getRequestParams = new ArrayList<>(responses.size());
deleteRequestParams = new ArrayList<>();
}
@Override
public HttpResponse get(String url, String requestBody) {
getRequestParams.add(new RequestParams(url, requestBody));
return getResponses.get(requestCount++);
}
@Override
public HttpResponse delete(String url, String requestBody) {
deleteRequestParams.add(new RequestParams(url, requestBody));
HttpResponse response = new HttpResponse(toStream(CLEAR_SCROLL_RESPONSE), 200);
deleteResponses.add(response);
return response;
}
public RequestParams getGetRequestParams(int callCount) {
return getRequestParams.get(callCount);
}
public RequestParams getDeleteRequestParams(int callCount) {
return deleteRequestParams.get(callCount);
}
public void assertEqualRequestsToResponses() {
assertEquals(getResponses.size(), getRequestParams.size());
}
public void assertResponsesHaveBeenConsumed() throws IOException {
for (HttpResponse response : getResponses) {
assertEquals(0, response.getStream().available());
}
for (HttpResponse response : deleteResponses) {
assertEquals(0, response.getStream().available());
}
}
}
private static class RequestParams {
public final String url;
public final String requestBody;
public RequestParams(String url, String requestBody) {
this.url = url;
this.requestBody = requestBody;
}
}
}

View File

@ -1,135 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.test.ESTestCase;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class ElasticsearchQueryBuilderTests extends ESTestCase {
private static final String MATCH_ALL_QUERY = "{\"match_all\":{}}";
public void testCreateSearchBody_GivenQueryOnly() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "time");
assertFalse(queryBuilder.isAggregated());
String searchBody = queryBuilder.createSearchBody(1451606400000L, 1451610000000L);
String expected = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2016-01-01T00:00:00.000Z\"," + " \"lt\": \"2016-01-01T01:00:00.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " }"
+ "}";
assertEquals(expected.replaceAll(" ", ""), searchBody.replaceAll(" ", ""));
}
public void testCreateSearchBody_GivenQueryAndScriptFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null,
"{\"test1\":{\"script\": \"...\"}}", "@timestamp");
assertFalse(queryBuilder.isAggregated());
String searchBody = queryBuilder.createSearchBody(1451606400000L, 1451610000000L);
String expected = "{" + " \"sort\": [" + " {\"@timestamp\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {"
+ " \"bool\": {" + " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"@timestamp\": {" + " \"gte\": \"2016-01-01T00:00:00.000Z\","
+ " \"lt\": \"2016-01-01T01:00:00.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"script_fields\": {\"test1\":{\"script\":\"...\"}}"
+ "}";
assertEquals(expected.replaceAll(" ", ""), searchBody.replaceAll(" ", ""));
}
public void testCreateSearchBody_GivenQueryAndAggs() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{}}", null, "time");
assertTrue(queryBuilder.isAggregated());
String searchBody = queryBuilder.createSearchBody(1451606400000L, 1451610000000L);
String expected = "{" + " \"sort\": [" + " {\"time\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {" + " \"time\": {"
+ " \"gte\": \"2016-01-01T00:00:00.000Z\"," + " \"lt\": \"2016-01-01T01:00:00.000Z\","
+ " \"format\": \"date_time\"" + " }" + " }" + " }" + " ]" + " }" + " },"
+ " \"aggs\":{\"my_aggs\":{}}" + "}";
assertEquals(expected.replaceAll(" ", ""), searchBody.replaceAll(" ", ""));
}
public void testCreateDataSummaryQuery_GivenQueryOnly() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "@timestamp");
assertFalse(queryBuilder.isAggregated());
String dataSummaryQuery = queryBuilder.createDataSummaryQuery(1451606400000L, 1451610000000L);
String expected = "{" + " \"sort\": [" + " {\"_doc\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"@timestamp\": {" + " \"gte\": \"2016-01-01T00:00:00.000Z\","
+ " \"lt\": \"2016-01-01T01:00:00.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"aggs\":{" + " \"earliestTime\":{"
+ " \"min\":{\"field\":\"@timestamp\"}" + " }," + " \"latestTime\":{"
+ " \"max\":{\"field\":\"@timestamp\"}" + " }" + " }" + "}";
assertEquals(expected.replaceAll(" ", ""), dataSummaryQuery.replaceAll(" ", ""));
}
public void testCreateDataSummaryQuery_GivenQueryAndScriptFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null,
"{\"test1\":{\"script\": \"...\"}}", "@timestamp");
assertFalse(queryBuilder.isAggregated());
String dataSummaryQuery = queryBuilder.createDataSummaryQuery(1451606400000L, 1451610000000L);
String expected = "{" + " \"sort\": [" + " {\"_doc\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"@timestamp\": {" + " \"gte\": \"2016-01-01T00:00:00.000Z\","
+ " \"lt\": \"2016-01-01T01:00:00.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"aggs\":{" + " \"earliestTime\":{"
+ " \"min\":{\"field\":\"@timestamp\"}" + " }," + " \"latestTime\":{"
+ " \"max\":{\"field\":\"@timestamp\"}" + " }" + " }" + "}";
assertEquals(expected.replaceAll(" ", ""), dataSummaryQuery.replaceAll(" ", ""));
}
public void testCreateDataSummaryQuery_GivenQueryAndAggs() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{}}", null, "@timestamp");
assertTrue(queryBuilder.isAggregated());
String dataSummaryQuery = queryBuilder.createDataSummaryQuery(1451606400000L, 1451610000000L);
String expected = "{" + " \"sort\": [" + " {\"_doc\": {\"order\": \"asc\"}}" + " ]," + " \"query\": {" + " \"bool\": {"
+ " \"filter\": [" + " {\"match_all\":{}}," + " {" + " \"range\": {"
+ " \"@timestamp\": {" + " \"gte\": \"2016-01-01T00:00:00.000Z\","
+ " \"lt\": \"2016-01-01T01:00:00.000Z\"," + " \"format\": \"date_time\"" + " }"
+ " }" + " }" + " ]" + " }" + " }," + " \"aggs\":{" + " \"earliestTime\":{"
+ " \"min\":{\"field\":\"@timestamp\"}" + " }," + " \"latestTime\":{"
+ " \"max\":{\"field\":\"@timestamp\"}" + " }" + " }" + "}";
assertEquals(expected.replaceAll(" ", ""), dataSummaryQuery.replaceAll(" ", ""));
}
public void testLogQueryInfo_GivenNoAggsNoFields() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, null, null, "@timestamp");
Logger logger = mock(Logger.class);
queryBuilder.logQueryInfo(logger);
verify(logger).debug("Will retrieve whole _source document from Elasticsearch");
}
public void testLogQueryInfo() {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(MATCH_ALL_QUERY, "{\"my_aggs\":{ \"foo\": \"bar\" }}",
null, "@timestamp");
Logger logger = mock(Logger.class);
queryBuilder.logQueryInfo(logger);
verify(logger).debug("Will use the following Elasticsearch aggregations: {\"my_aggs\":{ \"foo\": \"bar\" }}");
}
}

View File

@ -1,60 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public class ElasticsearchUrlBuilderTests extends ESTestCase {
private static final List<String> SINGLE_INDEX = Arrays.asList("foo-*");
private static final List<String> TWO_INDEXES = Arrays.asList("index_1", "index_2");
private static final List<String> EMPTY_TYPES = Collections.emptyList();
private static final List<String> TWO_TYPES = Arrays.asList("type_1", "type_2");
public void testBuildIndexSettingsUrl() {
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).buildIndexSettingsUrl("foo");
assertEquals("http://localhost:9200/foo/_settings", url);
}
public void testBuildInitScrollUrl_GivenMultipleIndicesAndTypes() {
String url = ElasticsearchUrlBuilder.create(TWO_INDEXES, TWO_TYPES).buildInitScrollUrl(5000);
assertEquals("http://localhost:9200/index_1,index_2/type_1,type_2/_search?scroll=60m&size=5000", url);
}
public void testBuildContinueScrollUrl() {
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).buildContinueScrollUrl();
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", url);
}
public void testBuildClearScrollUrl() {
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).buildClearScrollUrl();
assertEquals("http://localhost:9200/_search/scroll", url);
}
public void testBuildSearchSizeOneUrl_GivenMultipleIndicesAndTypes() {
String url = ElasticsearchUrlBuilder.create(TWO_INDEXES, TWO_TYPES).buildSearchSizeOneUrl();
assertEquals("http://localhost:9200/index_1,index_2/type_1,type_2/_search?size=1", url);
}
public void testBuildSearchSizeOneUrl_GivenMultipleIndicesAndEmptyTypes() {
String url = ElasticsearchUrlBuilder.create(TWO_INDEXES, EMPTY_TYPES).buildSearchSizeOneUrl();
assertEquals("http://localhost:9200/index_1,index_2/_search?size=1", url);
}
public void testGetBaseUrl_GivenNoEndingSlash() {
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).getBaseUrl();
assertEquals("http://localhost:9200/", url);
}
public void testGetBaseUrl_GivenEndingSlash() {
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).getBaseUrl();
assertEquals("http://localhost:9200/", url);
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.scheduler.http;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class HttpResponseTests extends ESTestCase {
public void testGetResponseAsStream() throws IOException {
InputStream stream = new ByteArrayInputStream("foo\nbar".getBytes(StandardCharsets.UTF_8));
HttpResponse response = new HttpResponse(stream, 200);
assertEquals("foo\nbar", response.getResponseAsString());
assertEquals(200, response.getResponseCode());
}
public void testGetResponseAsStream_GivenStreamThrows() throws IOException {
InputStream stream = mock(InputStream.class);
HttpResponse response = new HttpResponse(stream, 200);
try {
response.getResponseAsString();
fail();
} catch (UncheckedIOException e) {
verify(stream).close();
}
}
}