Remove unnecessary parts of scheduler config (elastic/elasticsearch#468)

* Remove credentials from SchedulerConfig

* Remove dataSource and baseUrl from SchedulerConfig

Original commit: elastic/x-pack-elasticsearch@f5b92be252
This commit is contained in:
Dimitris Athanasiou 2016-12-06 11:38:39 +00:00 committed by GitHub
parent 3f35eac183
commit 50df0d4326
18 changed files with 206 additions and 725 deletions

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.prelert;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
@ -16,7 +17,9 @@ import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestHandler;
@ -174,7 +177,9 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectResultsParser, processFactory);
ScheduledJobService scheduledJobService = new ScheduledJobService(threadPool, client, jobProvider, dataProcessor,
new HttpDataExtractorFactory(), System::currentTimeMillis);
// norelease: we will no longer need to pass the client here after we switch to a client based data extractor
new HttpDataExtractorFactory(client),
System::currentTimeMillis);
return Arrays.asList(
jobProvider,
jobManager,

View File

@ -690,21 +690,19 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
if (analysisConfig.getBucketSpan() == null) {
throw new IllegalArgumentException(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_REQUIRES_BUCKET_SPAN));
}
if (schedulerConfig.getDataSource() == SchedulerConfig.DataSource.ELASTICSEARCH) {
if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_ELASTICSEARCH_DOES_NOT_SUPPORT_LATENCY));
}
if (schedulerConfig.getAggregationsOrAggs() != null
&& !SchedulerConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_AGGREGATIONS_REQUIRES_SUMMARY_COUNT_FIELD,
SchedulerConfig.DataSource.ELASTICSEARCH.toString(), SchedulerConfig.DOC_COUNT));
}
if (dataDescription == null || dataDescription.getFormat() != DataDescription.DataFormat.ELASTICSEARCH) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_ELASTICSEARCH_REQUIRES_DATAFORMAT_ELASTICSEARCH));
}
if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_ELASTICSEARCH_DOES_NOT_SUPPORT_LATENCY));
}
if (schedulerConfig.getAggregationsOrAggs() != null
&& !SchedulerConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_AGGREGATIONS_REQUIRES_SUMMARY_COUNT_FIELD,
SchedulerConfig.DOC_COUNT));
}
if (dataDescription == null || dataDescription.getFormat() != DataDescription.DataFormat.ELASTICSEARCH) {
throw new IllegalArgumentException(
Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_ELASTICSEARCH_REQUIRES_DATAFORMAT_ELASTICSEARCH));
}
}
if (transforms != null && transforms.isEmpty() == false) {

View File

@ -9,24 +9,18 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.ToXContentToBytes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
@ -54,15 +48,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
public static final String DOC_COUNT = "doc_count";
// NORELEASE: no camel casing:
public static final ParseField DATA_SOURCE = new ParseField("data_source");
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FILE_PATH = new ParseField("file_path");
public static final ParseField TAIL_FILE = new ParseField("tail_file");
public static final ParseField BASE_URL = new ParseField("base_url");
public static final ParseField USERNAME = new ParseField("username");
public static final ParseField PASSWORD = new ParseField("password");
public static final ParseField ENCRYPTED_PASSWORD = new ParseField("encrypted_password");
public static final ParseField INDEXES = new ParseField("indexes");
public static final ParseField TYPES = new ParseField("types");
public static final ParseField QUERY = new ParseField("query");
@ -76,26 +63,15 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
*/
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<SchedulerConfig.Builder, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>("schedule_config", a -> new SchedulerConfig.Builder((DataSource) a[0]));
new ConstructingObjectParser<>("schedule_config", a -> new SchedulerConfig.Builder((List<String>) a[0], (List<String>) a[1]));
static {
PARSER.declareField(ConstructingObjectParser.constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return DataSource.readFromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, DATA_SOURCE, ObjectParser.ValueType.STRING);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDEXES);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), TYPES);
PARSER.declareLong(Builder::setQueryDelay, QUERY_DELAY);
PARSER.declareLong(Builder::setFrequency, FREQUENCY);
PARSER.declareString(Builder::setFilePath, FILE_PATH);
PARSER.declareBoolean(Builder::setTailFile, TAIL_FILE);
PARSER.declareString(Builder::setUsername, USERNAME);
PARSER.declareString(Builder::setPassword, PASSWORD);
PARSER.declareString(Builder::setEncryptedPassword, ENCRYPTED_PASSWORD);
PARSER.declareString(Builder::setBaseUrl, BASE_URL);
PARSER.declareStringArray(Builder::setIndexes, INDEXES);
PARSER.declareStringArray(Builder::setTypes, TYPES);
PARSER.declareObject(Builder::setQuery, (p, c) -> {
try {
return p.map();
@ -128,9 +104,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
}
// NORELEASE: please use primitives where possible here:
private final DataSource dataSource;
/**
* The delay in seconds before starting to query a period of time
*/
@ -141,25 +114,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
*/
private final Long frequency;
/**
* These values apply to the FILE data source
*/
private final String filePath;
private final Boolean tailFile;
/**
* Used for data sources that require credentials. May be null in the case
* where credentials are sometimes needed and sometimes not (e.g.
* Elasticsearch).
*/
private final String username;
private final String password;
private final String encryptedPassword;
/**
* These values apply to the ELASTICSEARCH data source
*/
private final String baseUrl;
private final List<String> indexes;
private final List<String> types;
// NORELEASE: These 4 fields can be reduced to a single
@ -171,19 +125,11 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
private final Boolean retrieveWholeSource;
private final Integer scrollSize;
private SchedulerConfig(DataSource dataSource, Long queryDelay, Long frequency, String filePath, Boolean tailFile, String username,
String password, String encryptedPassword, String baseUrl, List<String> indexes, List<String> types, Map<String, Object> query,
Map<String, Object> aggregations, Map<String, Object> aggs, Map<String, Object> scriptFields, Boolean retrieveWholeSource,
Integer scrollSize) {
this.dataSource = dataSource;
private SchedulerConfig(Long queryDelay, Long frequency, List<String> indexes, List<String> types, Map<String, Object> query,
Map<String, Object> aggregations, Map<String, Object> aggs, Map<String, Object> scriptFields,
Boolean retrieveWholeSource, Integer scrollSize) {
this.queryDelay = queryDelay;
this.frequency = frequency;
this.filePath = filePath;
this.tailFile = tailFile;
this.username = username;
this.password = password;
this.encryptedPassword = encryptedPassword;
this.baseUrl = baseUrl;
this.indexes = indexes;
this.types = types;
this.query = query;
@ -195,15 +141,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
}
public SchedulerConfig(StreamInput in) throws IOException {
this.dataSource = DataSource.readFromStream(in);
this.queryDelay = in.readOptionalLong();
this.frequency = in.readOptionalLong();
this.filePath = in.readOptionalString();
this.tailFile = in.readOptionalBoolean();
this.username = in.readOptionalString();
this.password = in.readOptionalString();
this.encryptedPassword = in.readOptionalString();
this.baseUrl = in.readOptionalString();
if (in.readBoolean()) {
this.indexes = in.readList(StreamInput::readString);
} else {
@ -238,15 +177,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.scrollSize = in.readOptionalVInt();
}
/**
* The data source that the scheduler is to pull data from.
*
* @return The data source.
*/
public DataSource getDataSource() {
return this.dataSource;
}
public Long getQueryDelay() {
return this.queryDelay;
}
@ -255,44 +185,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
return this.frequency;
}
/**
* For the FILE data source only, the path to the file.
*
* @return The path to the file, or <code>null</code> if not set.
*/
public String getFilePath() {
return this.filePath;
}
/**
* For the FILE data source only, should the file be tailed? If not it will
* just be read from once.
*
* @return Should the file be tailed? (<code>null</code> if not set.)
*/
public Boolean getTailFile() {
return this.tailFile;
}
/**
* For the ELASTICSEARCH data source only, the base URL to connect to
* Elasticsearch on.
*
* @return The URL, or <code>null</code> if not set.
*/
public String getBaseUrl() {
return this.baseUrl;
}
/**
* The username to use to connect to the data source (if any).
*
* @return The username, or <code>null</code> if not set.
*/
public String getUsername() {
return this.username;
}
/**
* For the ELASTICSEARCH data source only, one or more indexes to search for
* input data.
@ -348,29 +240,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
return this.scrollSize;
}
/**
* The encrypted password to use to connect to the data source (if any). A
* class outside this package is responsible for encrypting and decrypting
* the password.
*
* @return The password, or <code>null</code> if not set.
*/
public String getEncryptedPassword() {
return encryptedPassword;
}
/**
* The plain text password to use to connect to the data source (if any).
* This is likely to return <code>null</code> most of the time, as the
* intention is that it is only present it initial configurations, and gets
* replaced with an encrypted password as soon as possible after receipt.
*
* @return The password, or <code>null</code> if not set.
*/
public String getPassword() {
return password;
}
/**
* For the ELASTICSEARCH data source only, optional Elasticsearch
* script_fields to add to the search to be submitted to Elasticsearch to
@ -452,15 +321,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
dataSource.writeTo(out);
out.writeOptionalLong(queryDelay);
out.writeOptionalLong(frequency);
out.writeOptionalString(filePath);
out.writeOptionalBoolean(tailFile);
out.writeOptionalString(username);
out.writeOptionalString(password);
out.writeOptionalString(encryptedPassword);
out.writeOptionalString(baseUrl);
if (indexes != null) {
out.writeBoolean(true);
out.writeStringList(indexes);
@ -504,31 +366,12 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DATA_SOURCE.getPreferredName(), dataSource.name().toUpperCase(Locale.ROOT));
if (queryDelay != null) {
builder.field(QUERY_DELAY.getPreferredName(), queryDelay);
}
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency);
}
if (filePath != null) {
builder.field(FILE_PATH.getPreferredName(), filePath);
}
if (tailFile != null) {
builder.field(TAIL_FILE.getPreferredName(), tailFile);
}
if (username != null) {
builder.field(USERNAME.getPreferredName(), username);
}
if (password != null) {
builder.field(PASSWORD.getPreferredName(), password);
}
if (encryptedPassword != null) {
builder.field(ENCRYPTED_PASSWORD.getPreferredName(), encryptedPassword);
}
if (baseUrl != null) {
builder.field(BASE_URL.getPreferredName(), baseUrl);
}
if (indexes != null) {
builder.field(INDEXES.getPreferredName(), indexes);
}
@ -574,11 +417,9 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
SchedulerConfig that = (SchedulerConfig) other;
return Objects.equals(this.dataSource, that.dataSource) && Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.queryDelay, that.queryDelay) && Objects.equals(this.filePath, that.filePath)
&& Objects.equals(this.tailFile, that.tailFile) && Objects.equals(this.baseUrl, that.baseUrl)
&& Objects.equals(this.username, that.username) && Objects.equals(this.password, that.password)
&& Objects.equals(this.encryptedPassword, that.encryptedPassword) && Objects.equals(this.indexes, that.indexes)
return Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indexes, that.indexes)
&& Objects.equals(this.types, that.types) && Objects.equals(this.query, that.query)
&& Objects.equals(this.retrieveWholeSource, that.retrieveWholeSource) && Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(this.getAggregationsOrAggs(), that.getAggregationsOrAggs())
@ -587,42 +428,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
@Override
public int hashCode() {
return Objects.hash(this.dataSource, frequency, queryDelay, this.filePath, tailFile, baseUrl, username, password, encryptedPassword,
this.indexes, types, query, retrieveWholeSource, scrollSize, getAggregationsOrAggs(), this.scriptFields);
}
/**
* Enum of the acceptable data sources.
*/
public enum DataSource implements Writeable {
FILE, ELASTICSEARCH;
/**
* Case-insensitive from string method. Works with ELASTICSEARCH,
* Elasticsearch, ElasticSearch, etc.
*
* @param value
* String representation
* @return The data source
*/
public static DataSource readFromString(String value) {
String valueUpperCase = value.toUpperCase(Locale.ROOT);
return DataSource.valueOf(valueUpperCase);
}
public static DataSource readFromStream(StreamInput in) throws IOException {
int ordinal = in.readVInt();
if (ordinal < 0 || ordinal >= values().length) {
throw new IOException("Unknown Operator ordinal [" + ordinal + "]");
}
return values()[ordinal];
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(ordinal());
}
return Objects.hash(frequency, queryDelay, indexes, types, query, retrieveWholeSource, scrollSize, getAggregationsOrAggs(),
scriptFields);
}
public static class Builder {
@ -635,19 +442,10 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
*/
private static final String MATCH_ALL_ES_QUERY = "match_all";
private final DataSource dataSource;
private Long queryDelay;
private Long frequency;
private String filePath;
private Boolean tailFile;
private String username;
private String password;
private String encryptedPassword;
private String baseUrl;
// NORELEASE: use Collections.emptyList() instead of null as initial
// value:
private List<String> indexes = null;
private List<String> types = null;
private List<String> indexes = Collections.emptyList();
private List<String> types = Collections.emptyList();
// NORELEASE: use Collections.emptyMap() instead of null as initial
// value:
// NORELEASE: Use SearchSourceBuilder
@ -658,37 +456,21 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
private Boolean retrieveWholeSource;
private Integer scrollSize;
// NORELEASE: figure out what the required fields are and made part of
// the only public constructor
public Builder(DataSource dataSource) {
this.dataSource = Objects.requireNonNull(dataSource);
switch (dataSource) {
case FILE:
setTailFile(false);
break;
case ELASTICSEARCH:
Map<String, Object> query = new HashMap<>();
query.put(MATCH_ALL_ES_QUERY, new HashMap<String, Object>());
setQuery(query);
setQueryDelay(DEFAULT_ELASTICSEARCH_QUERY_DELAY);
setRetrieveWholeSource(false);
setScrollSize(DEFAULT_SCROLL_SIZE);
break;
default:
throw new UnsupportedOperationException("unsupported datasource " + dataSource);
}
public Builder(List<String> indexes, List<String> types) {
this.indexes = Objects.requireNonNull(indexes);
this.types = Objects.requireNonNull(types);
Map<String, Object> query = new HashMap<>();
query.put(MATCH_ALL_ES_QUERY, new HashMap<String, Object>());
setQuery(query);
setQueryDelay(DEFAULT_ELASTICSEARCH_QUERY_DELAY);
setRetrieveWholeSource(false);
setScrollSize(DEFAULT_SCROLL_SIZE);
}
public Builder(SchedulerConfig config) {
this.dataSource = config.dataSource;
this.queryDelay = config.queryDelay;
this.frequency = config.frequency;
this.filePath = config.filePath;
this.tailFile = config.tailFile;
this.username = config.username;
this.password = config.password;
this.encryptedPassword = config.encryptedPassword;
this.baseUrl = config.baseUrl;
this.indexes = config.indexes;
this.types = config.types;
this.query = config.query;
@ -717,40 +499,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.frequency = frequency;
}
public void setFilePath(String filePath) {
this.filePath = Objects.requireNonNull(filePath);
}
public void setTailFile(boolean tailFile) {
this.tailFile = tailFile;
}
public void setUsername(String username) {
this.username = Objects.requireNonNull(username);
}
public void setPassword(String password) {
this.password = Objects.requireNonNull(password);
}
public void setEncryptedPassword(String encryptedPassword) {
this.encryptedPassword = Objects.requireNonNull(encryptedPassword);
}
public void setBaseUrl(String baseUrl) {
this.baseUrl = Objects.requireNonNull(baseUrl);
}
public void setIndexes(List<String> indexes) {
// NORELEASE: make use of Collections.unmodifiableList(...)
this.indexes = Objects.requireNonNull(indexes);
}
public void setTypes(List<String> types) {
// NORELEASE: make use of Collections.unmodifiableList(...)
this.types = Objects.requireNonNull(types);
}
public void setQuery(Map<String, Object> query) {
// NORELEASE: make use of Collections.unmodifiableMap(...)
this.query = Objects.requireNonNull(query);
@ -784,10 +532,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
this.scrollSize = scrollSize;
}
public DataSource getDataSource() {
return dataSource;
}
public Long getQueryDelay() {
return queryDelay;
}
@ -796,30 +540,6 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
return frequency;
}
public String getFilePath() {
return filePath;
}
public Boolean getTailFile() {
return tailFile;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getEncryptedPassword() {
return encryptedPassword;
}
public String getBaseUrl() {
return baseUrl;
}
public List<String> getIndexes() {
return indexes;
}
@ -863,88 +583,23 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
}
public SchedulerConfig build() {
switch (dataSource) {
case FILE:
if (Strings.hasLength(filePath) == false) {
throw invalidOptionValue(FILE_PATH.getPreferredName(), filePath);
}
if (baseUrl != null) {
throw notSupportedValue(BASE_URL, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (username != null) {
throw notSupportedValue(USERNAME, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (password != null) {
throw notSupportedValue(PASSWORD, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (encryptedPassword != null) {
throw notSupportedValue(ENCRYPTED_PASSWORD, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (indexes != null) {
throw notSupportedValue(INDEXES, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (types != null) {
throw notSupportedValue(TYPES, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (retrieveWholeSource != null) {
throw notSupportedValue(RETRIEVE_WHOLE_SOURCE, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (aggregations != null) {
throw notSupportedValue(AGGREGATIONS, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (query != null) {
throw notSupportedValue(QUERY, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (scriptFields != null) {
throw notSupportedValue(SCRIPT_FIELDS, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (scrollSize != null) {
throw notSupportedValue(SCROLL_SIZE, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
break;
case ELASTICSEARCH:
try {
new URL(baseUrl);
} catch (MalformedURLException e) {
throw invalidOptionValue(BASE_URL.getPreferredName(), baseUrl);
}
boolean isNoPasswordSet = password == null && encryptedPassword == null;
boolean isMultiplePasswordSet = password != null && encryptedPassword != null;
if ((username != null && isNoPasswordSet) || (isNoPasswordSet == false && username == null)) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INCOMPLETE_CREDENTIALS);
throw new IllegalArgumentException(msg);
}
if (isMultiplePasswordSet) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_MULTIPLE_PASSWORDS);
throw new IllegalArgumentException(msg);
}
if (indexes == null || indexes.isEmpty() || indexes.contains(null) || indexes.contains("")) {
throw invalidOptionValue(INDEXES.getPreferredName(), indexes);
}
if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) {
throw invalidOptionValue(TYPES.getPreferredName(), types);
}
if (aggregations != null && aggs != null) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_MULTIPLE_AGGREGATIONS);
throw new IllegalArgumentException(msg);
}
if (Boolean.TRUE.equals(retrieveWholeSource)) {
if (scriptFields != null) {
throw notSupportedValue(SCRIPT_FIELDS, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
}
if (filePath != null) {
throw notSupportedValue(FILE_PATH, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
if (tailFile != null) {
throw notSupportedValue(TAIL_FILE, dataSource, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
break;
default:
throw new IllegalStateException("Unexpected datasource [" + dataSource + "]");
if (indexes == null || indexes.isEmpty() || indexes.contains(null) || indexes.contains("")) {
throw invalidOptionValue(INDEXES.getPreferredName(), indexes);
}
return new SchedulerConfig(dataSource, queryDelay, frequency, filePath, tailFile, username, password, encryptedPassword,
baseUrl, indexes, types, query, aggregations, aggs, scriptFields, retrieveWholeSource, scrollSize);
if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) {
throw invalidOptionValue(TYPES.getPreferredName(), types);
}
if (aggregations != null && aggs != null) {
String msg = Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_MULTIPLE_AGGREGATIONS);
throw new IllegalArgumentException(msg);
}
if (Boolean.TRUE.equals(retrieveWholeSource)) {
if (scriptFields != null) {
throw notSupportedValue(SCRIPT_FIELDS, Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED);
}
}
return new SchedulerConfig(queryDelay, frequency, indexes, types, query, aggregations, aggs, scriptFields, retrieveWholeSource,
scrollSize);
}
private static ElasticsearchException invalidOptionValue(String fieldName, Object value) {
@ -952,8 +607,8 @@ public class SchedulerConfig extends ToXContentToBytes implements Writeable {
throw new IllegalArgumentException(msg);
}
private static ElasticsearchException notSupportedValue(ParseField field, DataSource dataSource, String key) {
String msg = Messages.getMessage(key, field.getPreferredName(), dataSource.toString());
private static ElasticsearchException notSupportedValue(ParseField field, String key) {
String msg = Messages.getMessage(key, field.getPreferredName());
throw new IllegalArgumentException(msg);
}

View File

@ -69,7 +69,6 @@ public final class Messages
public static final String JOB_CONFIG_BYFIELD_INCOMPATIBLE_FUNCTION = "job.config.byField.incompatible.function";
public static final String JOB_CONFIG_BYFIELD_NEEDS_ANOTHER = "job.config.byField.needs.another";
public static final String JOB_CONFIG_CANNOT_ENCRYPT_PASSWORD = "job.config.cannot.encrypt.password";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_REQUIRE_CATEGORIZATION_FIELD_NAME = "job.config.categorization.filters."
+ "require.categorization.field.name";
public static final String JOB_CONFIG_CATEGORIZATION_FILTERS_CONTAINS_DUPLICATES = "job.config.categorization.filters.contains"
@ -164,8 +163,6 @@ public final class Messages
public static final String JOB_CONFIG_UPDATE_RESULTS_RETENTION_DAYS_INVALID = "job.config.update.results.retention.days.invalid";
public static final String JOB_CONFIG_UPDATE_SCHEDULE_CONFIG_PARSE_ERROR = "job.config.update.scheduler.config.parse.error";
public static final String JOB_CONFIG_UPDATE_SCHEDULE_CONFIG_CANNOT_BE_NULL = "job.config.update.scheduler.config.cannot.be.null";
public static final String JOB_CONFIG_UPDATE_SCHEDULE_CONFIG_DATA_SOURCE_INVALID = "job.config.update.scheduler.config.data.source."
+ "invalid";
public static final String JOB_CONFIG_TRANSFORM_CIRCULAR_DEPENDENCY = "job.config.transform.circular.dependency";
public static final String JOB_CONFIG_TRANSFORM_CONDITION_REQUIRED = "job.config.transform.condition.required";
@ -184,7 +181,6 @@ public final class Messages
public static final String JOB_CONFIG_TRANSFORM_UNKNOWN_TYPE = "job.config.transform.unknown.type";
public static final String JOB_CONFIG_UNKNOWN_FUNCTION = "job.config.unknown.function";
public static final String JOB_CONFIG_SCHEDULER_UNKNOWN_DATASOURCE = "job.config.scheduler.unknown.datasource";
public static final String JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED = "job.config.scheduler.field.not.supported";
public static final String JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE = "job.config.scheduler.invalid.option.value";
public static final String JOB_CONFIG_SCHEDULER_REQUIRES_BUCKET_SPAN = "job.config.scheduler.requires.bucket.span";
@ -194,8 +190,6 @@ public final class Messages
+ "requires.summary.count.field";
public static final String JOB_CONFIG_SCHEDULER_ELASTICSEARCH_REQUIRES_DATAFORMAT_ELASTICSEARCH = "job.config.scheduler.elasticsearch."
+ "requires.dataformat.elasticsearch";
public static final String JOB_CONFIG_SCHEDULER_INCOMPLETE_CREDENTIALS = "job.config.scheduler.incomplete.credentials";
public static final String JOB_CONFIG_SCHEDULER_MULTIPLE_PASSWORDS = "job.config.scheduler.multiple.passwords";
public static final String JOB_CONFIG_SCHEDULER_MULTIPLE_AGGREGATIONS = "job.config.scheduler.multiple.aggregations";
public static final String JOB_DATA_CONCURRENT_USE_CLOSE = "job.data.concurrent.use.close";

View File

@ -9,6 +9,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
import org.elasticsearch.common.settings.Settings;
public class ElasticsearchUrlBuilder {
@ -31,11 +32,15 @@ public class ElasticsearchUrlBuilder {
this.types = Objects.requireNonNull(types);
}
public static ElasticsearchUrlBuilder create(String baseUrl, List<String> indexes, List<String> types) {
String sanitisedBaseUrl = baseUrl.endsWith(SLASH) ? baseUrl : baseUrl + SLASH;
public static ElasticsearchUrlBuilder create(List<String> indexes, List<String> types) {
// norelease: This class will be removed once we switch to a client based data extractor
return create(indexes, types, "http://localhost:9200/");
}
public static ElasticsearchUrlBuilder create(List<String> indexes, List<String> types, String baseUrl) {
String indexesAsString = indexes.stream().collect(Collectors.joining(COMMA));
String typesAsString = types.stream().collect(Collectors.joining(COMMA));
return new ElasticsearchUrlBuilder(sanitisedBaseUrl, indexesAsString, typesAsString);
return new ElasticsearchUrlBuilder(baseUrl, indexesAsString, typesAsString);
}
public String buildIndexSettingsUrl(String index) {

View File

@ -5,7 +5,12 @@
*/
package org.elasticsearch.xpack.prelert.job.scheduler.http;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.xpack.prelert.job.Job;
@ -19,18 +24,16 @@ import java.util.Map;
public class HttpDataExtractorFactory implements DataExtractorFactory {
public HttpDataExtractorFactory() {}
private static final Logger LOGGER = Loggers.getLogger(HttpDataExtractorFactory.class);
private final Client client;
public HttpDataExtractorFactory(Client client) {
this.client = client;
}
@Override
public DataExtractor newExtractor(Job job) {
SchedulerConfig schedulerConfig = job.getSchedulerConfig();
if (schedulerConfig.getDataSource() == SchedulerConfig.DataSource.ELASTICSEARCH) {
return createElasticsearchDataExtractor(job);
}
throw new IllegalArgumentException();
}
private DataExtractor createElasticsearchDataExtractor(Job job) {
String timeField = job.getDataDescription().getTimeField();
SchedulerConfig schedulerConfig = job.getSchedulerConfig();
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(
@ -41,10 +44,18 @@ public class HttpDataExtractorFactory implements DataExtractorFactory {
timeField);
HttpRequester httpRequester = new HttpRequester();
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder
.create(schedulerConfig.getBaseUrl(), schedulerConfig.getIndexes(), schedulerConfig.getTypes());
.create(schedulerConfig.getIndexes(), schedulerConfig.getTypes(), getBaseUrl());
return new ElasticsearchDataExtractor(httpRequester, urlBuilder, queryBuilder, schedulerConfig.getScrollSize());
}
private String getBaseUrl() {
NodesInfoResponse nodesInfoResponse = client.admin().cluster().prepareNodesInfo().get();
TransportAddress address = nodesInfoResponse.getNodes().get(0).getHttp().getAddress().publishAddress();
String baseUrl = "http://" + address.getAddress() + ":" + address.getPort() + "/";
LOGGER.info("Base URL: " + baseUrl);
return baseUrl;
}
String stringifyElasticsearchQuery(Map<String, Object> queryMap) {
String queryStr = writeMapAsJson(queryMap);
if (queryStr.startsWith("{") && queryStr.endsWith("}")) {

View File

@ -50,7 +50,6 @@ job.cannot.resume = Cannot resume job ''{0}'' while its status is {1}
job.config.byField.incompatible.function = by_field_name cannot be used with function ''{0}''
job.config.byField.needs.another = by_field_name must be used in conjunction with field_name or function
job.config.cannot.encrypt.password = Cannot encrypt password
job.config.categorization.filters.require.categorization.field.name = categorization_filters require setting categorization_field_name
job.config.categorization.filters.contains.duplicates = categorization_filters contain duplicates
job.config.categorization.filter.contains.empty = categorization_filters are not allowed to contain empty strings
@ -83,8 +82,8 @@ job.config.id.too.long = The job id cannot contain more than {0,number,integer}
job.config.invalid.fieldname.chars = Invalid fieldname ''{0}''. Fieldnames including over, by and partition fields cannot contain any of these characters: {1}
job.config.invalid.jobid.chars = Invalid job id; must be lowercase alphanumeric and may contain hyphens or underscores
job.config.invalid.timeformat = Invalid Time format string ''{0}''
job.config.missing.analysisconfig = Either an an AnalysisConfig or job reference id must be set
job.config.model.debug.config.invalid.bounds.percentile = Invalid modelDebugConfig: bounds_percentile must be in the range [0, 100]
job.config.missing.analysisconfig = An AnalysisConfig must be set
job.config.model.debug.config.invalid.bounds.percentile = Invalid modelDebugConfig: boundsPercentile must be in the range [0, 100]
job.config.field.value.too.low = {0} cannot be less than {1,number}. Value = {2,number}
job.config.no.analysis.field = One of function, fieldName, by_field_name or over_field_name must be set
job.config.no.analysis.field.not.count = Unless the function is 'count' one of field_name, by_field_name or over_field_name must be set
@ -116,13 +115,12 @@ job.config.update.job.is.not.closed = Cannot update key ''{0}'' while job is not
job.config.update.model.debug.config.parse.error = JSON parse error reading the update value for ModelDebugConfig
job.config.update.requires.non.empty.object = Update requires JSON that contains a non-empty object
job.config.update.parse.error = JSON parse error reading the job update
job.config.update.background.persist.interval.invalid = Invalid update value for background_persist_interval: value must be an exact number of seconds no less than 3600
job.config.update.renormalization.window.days.invalid = Invalid update value for renormalization_window_days: value must be an exact number of days
job.config.update.model.snapshot.retention.days.invalid = Invalid update value for model_snapshot_retention_days: value must be an exact number of days
job.config.update.results.retention.days.invalid = Invalid update value for results_retention_days: value must be an exact number of days
job.config.update.scheduler.config.parse.error = JSON parse error reading the update value for scheduler_config
job.config.update.scheduler.config.cannot.be.null = Invalid update value for scheduler_config: null
job.config.update.scheduler.config.data.source.invalid = Invalid update value for scheduler_config: data_source cannot be changed; existing is {0}, update had {1}
job.config.update.background.persist.interval.invalid = Invalid update value for backgroundPersistInterval: value must be an exact number of seconds no less than 3600
job.config.update.renormalization.window.days.invalid = Invalid update value for renormalizationWindowDays: value must be an exact number of days
job.config.update.model.snapshot.retention.days.invalid = Invalid update value for modelSnapshotRetentionDays: value must be an exact number of days
job.config.update.results.retention.days.invalid = Invalid update value for resultsRetentionDays: value must be an exact number of days
job.config.update.scheduler.config.parse.error = JSON parse error reading the update value for schedulerConfig
job.config.update.scheduler.config.cannot.be.null = Invalid update value for schedulerConfig: null
job.config.transform.circular.dependency = Transform type {0} with inputs {1} has a circular dependency
job.config.transform.condition.required = A condition must be defined for transform ''{0}''
@ -138,15 +136,12 @@ job.config.transform.outputs.unused = None of the outputs of transform ''{0}'' a
job.config.transform.output.name.used.more.than.once = Transform output name ''{0}'' is used more than once
job.config.transform.unknown.type = Unknown TransformType ''{0}''
job.config.unknown.function = Unknown function ''{0}''
job.config.scheduler.unknown.datasource = Unknown scheduler dataSource ''{0}''
job.config.scheduler.field.not.supported = Scheduler configuration field {0} not supported for dataSource ''{1}''
job.config.scheduler.field.not.supported = Scheduler configuration field {0} not supported
job.config.scheduler.invalid.option.value = Invalid {0} value ''{1}'' in scheduler configuration
job.config.scheduler.requires.bucket.span = A job configured with scheduler requires that bucket_span is specified
job.config.scheduler.elasticsearch.does.not.support.latency = A job configured with an Elasticsearch scheduler cannot support latency
job.config.scheduler.aggregations.requires.summary.count.field = A scheduler job with aggregations for dataSource ''{0}'' must have summary_count_field_name ''{1}''
job.config.scheduler.aggregations.requires.summary.count.field = A scheduler job with aggregations must have summaryCountFieldName ''{1}''
job.config.scheduler.elasticsearch.requires.dataformat.elasticsearch = A job configured with an Elasticsearch scheduler must have dataFormat ''ELASTICSEARCH''
job.config.scheduler.incomplete.credentials = Both username and password must be specified if either is
job.config.scheduler.multiple.passwords = Both password and encryptedPassword were specified - please just specify one
job.config.scheduler.multiple.aggregations = Both aggregations and aggs were specified - please just specify one
job.data.concurrent.use.close = Cannot close job {0} while another connection {2}is {1} the job

View File

@ -26,6 +26,7 @@ import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
@ -50,8 +51,7 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
AnalysisConfig analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("metric", "some_field").build())).build();
AnalysisLimits analysisLimits = new AnalysisLimits(randomPositiveLong(), randomPositiveLong());
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(SchedulerConfig.DataSource.FILE);
schedulerConfig.setFilePath("/file/path");
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(Arrays.asList("myIndex"), Arrays.asList("myType"));
DataDescription dataDescription = randomBoolean() ? new DataDescription.Builder().build() : null;
int numTransformers = randomIntBetween(0, 32);
List<TransformConfig> transformConfigList = new ArrayList<>(numTransformers);

View File

@ -150,13 +150,11 @@ public class ScheduledJobsIT extends ESIntegTestCase {
}
private Job.Builder createJob() {
SchedulerConfig.Builder scheduler = new SchedulerConfig.Builder(SchedulerConfig.DataSource.ELASTICSEARCH);
SchedulerConfig.Builder scheduler = new SchedulerConfig.Builder(Collections.singletonList("data"),
Collections.singletonList("type"));
scheduler.setQueryDelay(1);
scheduler.setFrequency(2);
InetSocketAddress address = cluster().httpAddresses()[0];
scheduler.setBaseUrl("http://" + NetworkAddress.format(address.getAddress()) + ":" + address.getPort());
scheduler.setIndexes(Collections.singletonList("data"));
scheduler.setTypes(Collections.singletonList("type"));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.ELASTICSEARCH);

View File

@ -138,13 +138,13 @@ public class ScheduledJobIT extends ESRestTestCase {
private Response createScheduledJob(String id) throws Exception {
HttpHost httpHost = getClusterHosts().get(0);
logger.info("Http host = " + httpHost.toURI());
String job = "{\n" + " \"job_id\":\"" + id + "\",\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":3600,\n"
+ " \"detectors\" :[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+ " },\n" + " \"data_description\" : {\n" + " \"format\":\"ELASTICSEARCH\",\n"
+ " \"time_field\":\"time\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " },\n"
+ " \"scheduler_config\" : {\n" + " \"data_source\":\"ELASTICSEARCH\",\n"
+ " \"base_url\":\"" + httpHost.toURI() + "\",\n" + " \"indexes\":[\"airline-data\"],\n"
+ " \"scheduler_config\" : {\n" + " \"indexes\":[\"airline-data\"],\n"
+ " \"types\":[\"response\"],\n" + " \"retrieve_whole_source\":true\n" + " }\n" + "}";
return client().performRequest("put", PrelertPlugin.BASE_PATH + "jobs", Collections.emptyMap(), new StringEntity(job));

View File

@ -11,7 +11,6 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig.DataSource;
import org.elasticsearch.xpack.prelert.job.condition.Condition;
import org.elasticsearch.xpack.prelert.job.condition.Operator;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
@ -80,9 +79,17 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
assertEquals(IgnoreDowntime.ONCE, job.getIgnoreDowntime());
}
public void testConstructor_GivenJobConfigurationWithElasticsearchScheduler_ShouldFillDefaults() {
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
expectThrows(NullPointerException.class, () -> schedulerConfig.setQuery(null));
public void testConstructor_GivenJobConfigurationWithScheduler_ShouldFillDefaults() {
Job.Builder builder = new Job.Builder("foo");
DataDescription.Builder dataDescriptionBuilder = new DataDescription.Builder();
dataDescriptionBuilder.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescriptionBuilder);
Detector.Builder detectorBuilder = new Detector.Builder();
detectorBuilder.setFunction("count");
builder.setAnalysisConfig(new AnalysisConfig.Builder(Arrays.asList(detectorBuilder.build())));
builder.setSchedulerConfig(new SchedulerConfig.Builder(Arrays.asList("my_index"), Arrays.asList("my_type")));
assertEquals(60L, builder.build().getSchedulerConfig().getQueryDelay().longValue());
}
public void testEquals_noId() {
@ -470,9 +477,8 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
}
public void testVerify_GivenElasticsearchSchedulerWithAggsAndNoSummaryCountField() throws IOException {
String errorMessage = Messages.getMessage(
Messages.JOB_CONFIG_SCHEDULER_AGGREGATIONS_REQUIRES_SUMMARY_COUNT_FIELD,
DataSource.ELASTICSEARCH.toString(), SchedulerConfig.DOC_COUNT);
String errorMessage = Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_AGGREGATIONS_REQUIRES_SUMMARY_COUNT_FIELD,
SchedulerConfig.DOC_COUNT);
SchedulerConfig.Builder schedulerConfig = createValidElasticsearchSchedulerConfigWithAggs();
Job.Builder builder = buildJobBuilder("foo");
builder.setSchedulerConfig(schedulerConfig);
@ -489,8 +495,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
public void testVerify_GivenElasticsearchSchedulerWithAggsAndWrongSummaryCountField() throws IOException {
String errorMessage = Messages.getMessage(
Messages.JOB_CONFIG_SCHEDULER_AGGREGATIONS_REQUIRES_SUMMARY_COUNT_FIELD,
DataSource.ELASTICSEARCH.toString(), SchedulerConfig.DOC_COUNT);
Messages.JOB_CONFIG_SCHEDULER_AGGREGATIONS_REQUIRES_SUMMARY_COUNT_FIELD, SchedulerConfig.DOC_COUNT);
SchedulerConfig.Builder schedulerConfig = createValidElasticsearchSchedulerConfigWithAggs();
Job.Builder builder = buildJobBuilder("foo");
builder.setSchedulerConfig(schedulerConfig);
@ -516,11 +521,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
}
private static SchedulerConfig.Builder createValidElasticsearchSchedulerConfig() {
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
schedulerConfig.setBaseUrl("http://localhost:9200");
schedulerConfig.setIndexes(Arrays.asList("myIndex"));
schedulerConfig.setTypes(Arrays.asList("myType"));
return schedulerConfig;
return new SchedulerConfig.Builder(Arrays.asList("myIndex"), Arrays.asList("myType"));
}
private static SchedulerConfig.Builder createValidElasticsearchSchedulerConfigWithAggs()
@ -588,12 +589,16 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
analysisConfig.setBucketSpan(100L);
builder.setAnalysisConfig(analysisConfig);
builder.setAnalysisLimits(new AnalysisLimits(randomPositiveLong(), randomPositiveLong()));
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(SchedulerConfig.DataSource.FILE);
schedulerConfig.setFilePath("/file/path");
builder.setSchedulerConfig(schedulerConfig);
if (randomBoolean()) {
builder.setDataDescription(new DataDescription.Builder());
}
if (randomBoolean()) {
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(randomStringList(1, 10), randomStringList(1, 10));
builder.setSchedulerConfig(schedulerConfig);
DataDescription.Builder dataDescriptionBuilder = new DataDescription.Builder();
dataDescriptionBuilder.setFormat(DataDescription.DataFormat.ELASTICSEARCH);
builder.setDataDescription(dataDescriptionBuilder);
}
String[] outputs;
TransformType[] transformTypes ;
AnalysisConfig ac = analysisConfig.build();
@ -636,4 +641,13 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
}
return builder.build();
}
private static List<String> randomStringList(int min, int max) {
int size = scaledRandomIntBetween(min, max);
List<String> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
list.add(randomAsciiOfLength(10));
}
return list;
}
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.job.SchedulerConfig.DataSource;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
@ -28,45 +27,23 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
@Override
protected SchedulerConfig createTestInstance() {
DataSource dataSource = randomFrom(DataSource.values());
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(dataSource);
switch (dataSource) {
case FILE:
builder.setFilePath(randomAsciiOfLength(10));
builder.setTailFile(randomBoolean());
break;
case ELASTICSEARCH:
builder.setBaseUrl("http://localhost/" + randomAsciiOfLength(10));
if (randomBoolean()) {
builder.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
boolean retrieveWholeSource = randomBoolean();
if (retrieveWholeSource) {
builder.setRetrieveWholeSource(randomBoolean());
} else if (randomBoolean()) {
builder.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
builder.setUsername(randomAsciiOfLength(10));
if (randomBoolean()) {
builder.setEncryptedPassword(randomAsciiOfLength(10));
} else {
builder.setPassword(randomAsciiOfLength(10));
}
}
builder.setIndexes(randomStringList(1, 10));
builder.setTypes(randomStringList(1, 10));
if (randomBoolean()) {
builder.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
} else if (randomBoolean()) {
builder.setAggs(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
break;
default:
throw new UnsupportedOperationException();
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(randomStringList(1, 10), randomStringList(1, 10));
if (randomBoolean()) {
builder.setQuery(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
boolean retrieveWholeSource = randomBoolean();
if (retrieveWholeSource) {
builder.setRetrieveWholeSource(randomBoolean());
} else if (randomBoolean()) {
builder.setScriptFields(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
builder.setAggregations(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
} else if (randomBoolean()) {
builder.setAggs(Collections.singletonMap(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
builder.setFrequency(randomPositiveLong());
@ -102,8 +79,8 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
public void testAnalysisConfigRequiredFields() throws IOException {
Logger logger = Loggers.getLogger(SchedulerConfigTests.class);
String jobConfigStr = "{" + "\"job_id\":\"farequote\"," + "\"scheduler_config\" : {" + "\"data_source\":\"ELASTICSEARCH\","
+ "\"base_url\":\"http://localhost:9200/\"," + "\"indexes\":[\"farequote\"]," + "\"types\":[\"farequote\"],"
String jobConfigStr = "{" + "\"job_id\":\"farequote\"," + "\"scheduler_config\" : {"
+ "\"indexes\":[\"farequote\"]," + "\"types\":[\"farequote\"],"
+ "\"query\":{\"match_all\":{} }" + "}," + "\"analysis_config\" : {" + "\"bucket_span\":3600,"
+ "\"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}],"
+ "\"influencers\" :[\"airline\"]" + "}," + "\"data_description\" : {" + "\"format\":\"ELASTICSEARCH\","
@ -125,10 +102,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
}
public void testBuildAggregatedFieldList_GivenNoAggregations() {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
builder.setIndexes(Arrays.asList("index"));
builder.setTypes(Arrays.asList("type"));
builder.setBaseUrl("http://localhost/");
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(Arrays.asList("index"), Arrays.asList("type"));
assertTrue(builder.build().buildAggregatedFieldList().isEmpty());
}
@ -139,8 +113,8 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
public void testAggsParse() throws IOException {
Logger logger = Loggers.getLogger(SchedulerConfigTests.class);
String jobConfigStr = "{" + "\"job_id\":\"farequote\"," + "\"scheduler_config\" : {" + "\"data_source\":\"ELASTICSEARCH\","
+ "\"base_url\":\"http://localhost:9200/\"," + "\"indexes\":[\"farequote\"]," + "\"types\":[\"farequote\"],"
String jobConfigStr = "{" + "\"job_id\":\"farequote\"," + "\"scheduler_config\" : {"
+ "\"indexes\":[\"farequote\"]," + "\"types\":[\"farequote\"],"
+ "\"query\":{\"match_all\":{} }," + "\"aggs\" : {" + "\"top_level_must_be_time\" : {" + "\"histogram\" : {"
+ "\"field\" : \"@timestamp\"," + "\"interval\" : 3600000" + "}," + "\"aggs\" : {" + "\"by_field_in_the_middle\" : { "
+ "\"terms\" : {" + "\"field\" : \"airline\"," + "\"size\" : 0" + "}," + "\"aggs\" : {" + "\"stats_last\" : {"
@ -171,64 +145,41 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
assertEquals("responsetime", aggregatedFieldList.get(2));
}
public void testFillDefaults_GivenDataSourceIsFile() {
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(DataSource.FILE);
schedulerConfig.setFilePath("/some/path");
SchedulerConfig.Builder expectedSchedulerConfig = new SchedulerConfig.Builder(DataSource.FILE);
expectedSchedulerConfig.setFilePath("/some/path");
expectedSchedulerConfig.setTailFile(false);
assertEquals(expectedSchedulerConfig.build(), schedulerConfig.build());
}
public void testFillDefaults_GivenDataSourceIsElasticsearchAndNothingToFill() {
SchedulerConfig.Builder originalSchedulerConfig = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
originalSchedulerConfig.setBaseUrl("http://localhost:9200/");
public void testFillDefaults_GivenNothingToFill() {
SchedulerConfig.Builder originalSchedulerConfig = new SchedulerConfig.Builder(Arrays.asList("index"), Arrays.asList("type"));
originalSchedulerConfig.setQuery(new HashMap<>());
originalSchedulerConfig.setQueryDelay(30L);
originalSchedulerConfig.setRetrieveWholeSource(true);
originalSchedulerConfig.setScrollSize(2000);
originalSchedulerConfig.setIndexes(Arrays.asList("index"));
originalSchedulerConfig.setTypes(Arrays.asList("type"));
SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
defaultedSchedulerConfig.setBaseUrl("http://localhost:9200/");
SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder(Arrays.asList("index"), Arrays.asList("type"));
defaultedSchedulerConfig.setQuery(new HashMap<>());
defaultedSchedulerConfig.setQueryDelay(30L);
defaultedSchedulerConfig.setRetrieveWholeSource(true);
defaultedSchedulerConfig.setScrollSize(2000);
defaultedSchedulerConfig.setIndexes(Arrays.asList("index"));
defaultedSchedulerConfig.setTypes(Arrays.asList("type"));
assertEquals(originalSchedulerConfig.build(), defaultedSchedulerConfig.build());
}
public void testFillDefaults_GivenDataSourceIsElasticsearchAndDefaultsAreApplied() {
SchedulerConfig.Builder expectedSchedulerConfig = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
expectedSchedulerConfig.setIndexes(Arrays.asList("index"));
expectedSchedulerConfig.setTypes(Arrays.asList("type"));
expectedSchedulerConfig.setBaseUrl("http://localhost:9200/");
SchedulerConfig.Builder expectedSchedulerConfig = new SchedulerConfig.Builder(Arrays.asList("index"), Arrays.asList("type"));
Map<String, Object> defaultQuery = new HashMap<>();
defaultQuery.put("match_all", new HashMap<String, Object>());
expectedSchedulerConfig.setQuery(defaultQuery);
expectedSchedulerConfig.setQueryDelay(60L);
expectedSchedulerConfig.setRetrieveWholeSource(false);
expectedSchedulerConfig.setScrollSize(1000);
SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
defaultedSchedulerConfig.setBaseUrl("http://localhost:9200/");
defaultedSchedulerConfig.setIndexes(Arrays.asList("index"));
defaultedSchedulerConfig.setTypes(Arrays.asList("type"));
SchedulerConfig.Builder defaultedSchedulerConfig = new SchedulerConfig.Builder(Arrays.asList("index"), Arrays.asList("type"));
assertEquals(expectedSchedulerConfig.build(), defaultedSchedulerConfig.build());
}
public void testEquals_GivenDifferentClass() {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(DataSource.FILE);
builder.setFilePath("path");
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(Arrays.asList("index"), Arrays.asList("type"));
assertFalse(builder.build().equals("a string"));
}
public void testEquals_GivenSameRef() {
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(DataSource.FILE);
builder.setFilePath("/some/path");
SchedulerConfig.Builder builder = new SchedulerConfig.Builder(Arrays.asList("index"), Arrays.asList("type"));
SchedulerConfig schedulerConfig = builder.build();
assertTrue(schedulerConfig.equals(schedulerConfig));
}
@ -244,17 +195,6 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
assertEquals(sc1.hashCode(), sc2.hashCode());
}
public void testEquals_GivenDifferentBaseUrl() {
SchedulerConfig.Builder b1 = createFullyPopulated();
SchedulerConfig.Builder b2 = createFullyPopulated();
b2.setBaseUrl("http://localhost:8081");
SchedulerConfig sc1 = b1.build();
SchedulerConfig sc2 = b2.build();
assertFalse(sc1.equals(sc2));
assertFalse(sc2.equals(sc1));
}
public void testEquals_GivenDifferentQueryDelay() {
SchedulerConfig.Builder b1 = createFullyPopulated();
SchedulerConfig.Builder b2 = createFullyPopulated();
@ -289,18 +229,18 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
}
public void testEquals_GivenDifferentIndexes() {
SchedulerConfig.Builder sc1 = createFullyPopulated();
SchedulerConfig.Builder sc2 = createFullyPopulated();
sc2.setIndexes(Arrays.asList("thisOtherCrazyIndex"));
SchedulerConfig.Builder sc1 = new SchedulerConfig.Builder(Arrays.asList("myIndex"), Arrays.asList("myType1", "myType2"));
SchedulerConfig.Builder sc2 = new SchedulerConfig.Builder(Arrays.asList("thisOtherCrazyIndex"),
Arrays.asList("myType1", "myType2"));
assertFalse(sc1.build().equals(sc2.build()));
assertFalse(sc2.build().equals(sc1.build()));
}
public void testEquals_GivenDifferentTypes() {
SchedulerConfig.Builder sc1 = createFullyPopulated();
SchedulerConfig.Builder sc2 = createFullyPopulated();
sc2.setTypes(Arrays.asList("thisOtherCrazyType"));
SchedulerConfig.Builder sc1 = new SchedulerConfig.Builder(Arrays.asList("myIndex"), Arrays.asList("myType1", "myType2"));
SchedulerConfig.Builder sc2 = new SchedulerConfig.Builder(Arrays.asList("thisOtherCrazyIndex"),
Arrays.asList("thisOtherCrazyType", "myType2"));
assertFalse(sc1.build().equals(sc2.build()));
assertFalse(sc2.build().equals(sc1.build()));
@ -329,12 +269,9 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
}
private static SchedulerConfig.Builder createFullyPopulated() {
SchedulerConfig.Builder sc = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
sc.setBaseUrl("http://localhost:8080");
SchedulerConfig.Builder sc = new SchedulerConfig.Builder(Arrays.asList("myIndex"), Arrays.asList("myType1", "myType2"));
sc.setFrequency(60L);
sc.setScrollSize(5000);
sc.setIndexes(Arrays.asList("myIndex"));
sc.setTypes(Arrays.asList("myType1", "myType2"));
Map<String, Object> query = new HashMap<>();
query.put("foo", new HashMap<>());
sc.setQuery(query);
@ -345,39 +282,9 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
return sc;
}
public void testCheckValidFile_AllOk() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.FILE);
conf.setFilePath("myfile.csv");
conf.build();
}
public void testCheckValidFile_NoPath() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.FILE);
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "file_path", "null"), e.getMessage());
}
public void testCheckValidFile_EmptyPath() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.FILE);
conf.setFilePath("");
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "file_path", ""), e.getMessage());
}
public void testCheckValidFile_InappropriateField() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.FILE);
conf.setFilePath("myfile.csv");
conf.setBaseUrl("http://localhost:9200/");
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED, "base_url", DataSource.FILE), e.getMessage());
}
public void testCheckValidElasticsearch_AllOk() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
conf.setQueryDelay(90L);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
String json = "{ \"match_all\" : {} }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setQuery(parser.map());
@ -385,87 +292,13 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
conf.build();
}
public void testCheckValidElasticsearch_WithUsernameAndPassword() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setQueryDelay(90L);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
conf.setUsername("dave");
conf.setPassword("secret");
String json = "{ \"match_all\" : {} }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setQuery(parser.map());
SchedulerConfig schedulerConfig = conf.build();
assertEquals("dave", schedulerConfig.getUsername());
assertEquals("secret", schedulerConfig.getPassword());
}
public void testCheckValidElasticsearch_WithUsernameAndEncryptedPassword() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setQueryDelay(90L);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
conf.setUsername("dave");
conf.setEncryptedPassword("already_encrypted");
String json = "{ \"match_all\" : {} }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setQuery(parser.map());
SchedulerConfig schedulerConfig = conf.build();
assertEquals("dave", schedulerConfig.getUsername());
assertEquals("already_encrypted", schedulerConfig.getEncryptedPassword());
}
public void testCheckValidElasticsearch_WithPasswordNoUsername() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
conf.setPassword("secret");
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INCOMPLETE_CREDENTIALS), e.getMessage());
}
public void testCheckValidElasticsearch_BothPasswordAndEncryptedPassword() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
conf.setUsername("dave");
conf.setPassword("secret");
conf.setEncryptedPassword("already_encrypted");
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_MULTIPLE_PASSWORDS), e.getMessage());
}
public void testCheckValidElasticsearch_NoQuery() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
assertEquals(Collections.singletonMap("match_all", new HashMap<>()), conf.build().getQuery());
}
public void testCheckValidElasticsearch_InappropriateField() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
String json = "{ \"match_all\" : {} }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
conf.setQuery(parser.map());
conf.setTailFile(true);
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_FIELD_NOT_SUPPORTED, "tail_file", DataSource.ELASTICSEARCH),
e.getMessage());
}
public void testCheckValidElasticsearch_GivenScriptFieldsNotWholeSource() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
String json = "{ \"twiceresponsetime\" : { \"script\" : { \"lang\" : \"expression\", "
+ "\"inline\" : \"doc['responsetime'].value * 2\" } } }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
@ -475,10 +308,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
}
public void testCheckValidElasticsearch_GivenScriptFieldsAndWholeSource() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myindex"));
conf.setTypes(Arrays.asList("mytype"));
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
String json = "{ \"twiceresponsetime\" : { \"script\" : { \"lang\" : \"expression\", "
+ "\"inline\" : \"doc['responsetime'].value * 2\" } } }";
XContentParser parser = XContentFactory.xContent(json).createParser(json);
@ -488,16 +318,11 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
}
public void testCheckValidElasticsearch_GivenNullIndexes() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
expectThrows(NullPointerException.class, () -> conf.setIndexes(null));
expectThrows(NullPointerException.class, () -> new SchedulerConfig.Builder(null, Arrays.asList("mytype")));
}
public void testCheckValidElasticsearch_GivenEmptyIndexes() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Collections.emptyList());
conf.setTypes(Arrays.asList("mytype"));
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Collections.emptyList(), Arrays.asList("mytype"));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "indexes", "[]"), e.getMessage());
}
@ -506,10 +331,7 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
List<String> indexes = new ArrayList<>();
indexes.add(null);
indexes.add(null);
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(indexes);
conf.setTypes(Arrays.asList("mytype"));
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(indexes, Arrays.asList("mytype"));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "indexes", "[null, null]"), e.getMessage());
}
@ -518,44 +340,38 @@ public class SchedulerConfigTests extends AbstractSerializingTestCase<SchedulerC
List<String> indexes = new ArrayList<>();
indexes.add("");
indexes.add("");
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(indexes);
conf.setTypes(Arrays.asList("mytype"));
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(indexes, Arrays.asList("mytype"));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, conf::build);
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "indexes", "[, ]"), e.getMessage());
}
public void testCheckValidElasticsearch_GivenNegativeQueryDelay() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setQueryDelay(-10L));
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "query_delay", -10L), e.getMessage());
}
public void testCheckValidElasticsearch_GivenZeroFrequency() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(0L));
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "frequency", 0L), e.getMessage());
}
public void testCheckValidElasticsearch_GivenNegativeFrequency() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(-600L));
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "frequency", -600L), e.getMessage());
}
public void testCheckValidElasticsearch_GivenNegativeScrollSize() throws IOException {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setScrollSize(-1000));
assertEquals(Messages.getMessage(Messages.JOB_CONFIG_SCHEDULER_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage());
}
public void testCheckValidElasticsearch_GivenBothAggregationsAndAggsAreSet() {
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(DataSource.ELASTICSEARCH);
SchedulerConfig.Builder conf = new SchedulerConfig.Builder(Arrays.asList("myindex"), Arrays.asList("mytype"));
conf.setScrollSize(1000);
conf.setBaseUrl("http://localhost:9200/");
conf.setIndexes(Arrays.asList("myIndex"));
conf.setTypes(Arrays.asList("mytype"));
Map<String, Object> aggs = new HashMap<>();
conf.setAggregations(aggs);
conf.setAggs(aggs);

View File

@ -184,10 +184,8 @@ public class JobAllocatorTests extends ESTestCase {
public void testScheduledJobHasDefaultSchedulerState() {
PrelertMetadata.Builder pmBuilder = new PrelertMetadata.Builder();
SchedulerConfig.Builder schedulerConfigBuilder = new SchedulerConfig.Builder(SchedulerConfig.DataSource.ELASTICSEARCH);
schedulerConfigBuilder.setBaseUrl("http://server");
schedulerConfigBuilder.setIndexes(Collections.singletonList("foo"));
schedulerConfigBuilder.setTypes(Collections.singletonList("bar"));
SchedulerConfig.Builder schedulerConfigBuilder = new SchedulerConfig.Builder(Collections.singletonList("foo"),
Collections.singletonList("bar"));
Job.Builder jobBuilder = buildJobBuilder("_job_id");
jobBuilder.setSchedulerConfig(schedulerConfigBuilder);

View File

@ -108,7 +108,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
overridden.add(Job.DESCRIPTION.getPreferredName());
overridden.add(Allocation.STATUS.getPreferredName());
overridden.add(ModelSnapshot.DESCRIPTION.getPreferredName());
overridden.add(SchedulerConfig.USERNAME.getPreferredName());
Set<String> expected = new HashSet<>();

View File

@ -39,19 +39,19 @@ import java.util.Date;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
import static org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.Request;
import static org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.INSTANCE;
import static org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction.Request;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ScheduledJobServiceTests extends ESTestCase {
@ -174,10 +174,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
acBuilder.setBucketSpan(3600L);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(SchedulerConfig.DataSource.ELASTICSEARCH);
schedulerConfig.setBaseUrl("http://localhost");
schedulerConfig.setIndexes(Arrays.asList("myIndex"));
schedulerConfig.setTypes(Arrays.asList("myType"));
SchedulerConfig.Builder schedulerConfig = new SchedulerConfig.Builder(Arrays.asList("myIndex"), Arrays.asList("myType"));
Job.Builder builder = new Job.Builder("foo");
builder.setAnalysisConfig(acBuilder);

View File

@ -689,7 +689,7 @@ public class ElasticsearchDataExtractorTests extends ESTestCase {
private void createExtractor(MockHttpRequester httpRequester) {
ElasticsearchQueryBuilder queryBuilder = new ElasticsearchQueryBuilder(SEARCH, aggregations, scriptFields, fields, TIME_FIELD);
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder.create(BASE_URL, INDEXES, TYPES);
ElasticsearchUrlBuilder urlBuilder = ElasticsearchUrlBuilder.create(INDEXES, TYPES);
extractor = new ElasticsearchDataExtractor(httpRequester, urlBuilder, queryBuilder, 1000);
}

View File

@ -13,49 +13,48 @@ import java.util.List;
public class ElasticsearchUrlBuilderTests extends ESTestCase {
private static final String BASE_URL = "http://localhost:9200";
private static final List<String> SINGLE_INDEX = Arrays.asList("foo-*");
private static final List<String> TWO_INDEXES = Arrays.asList("index_1", "index_2");
private static final List<String> EMPTY_TYPES = Collections.emptyList();
private static final List<String> TWO_TYPES = Arrays.asList("type_1", "type_2");
public void testBuildIndexSettingsUrl() {
String url = ElasticsearchUrlBuilder.create(BASE_URL, SINGLE_INDEX, TWO_TYPES).buildIndexSettingsUrl("foo");
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).buildIndexSettingsUrl("foo");
assertEquals("http://localhost:9200/foo/_settings", url);
}
public void testBuildInitScrollUrl_GivenMultipleIndicesAndTypes() {
String url = ElasticsearchUrlBuilder.create(BASE_URL, TWO_INDEXES, TWO_TYPES).buildInitScrollUrl(5000);
String url = ElasticsearchUrlBuilder.create(TWO_INDEXES, TWO_TYPES).buildInitScrollUrl(5000);
assertEquals("http://localhost:9200/index_1,index_2/type_1,type_2/_search?scroll=60m&size=5000", url);
}
public void testBuildContinueScrollUrl() {
String url = ElasticsearchUrlBuilder.create(BASE_URL, SINGLE_INDEX, TWO_TYPES).buildContinueScrollUrl();
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).buildContinueScrollUrl();
assertEquals("http://localhost:9200/_search/scroll?scroll=60m", url);
}
public void testBuildClearScrollUrl() {
String url = ElasticsearchUrlBuilder.create(BASE_URL, SINGLE_INDEX, TWO_TYPES).buildClearScrollUrl();
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).buildClearScrollUrl();
assertEquals("http://localhost:9200/_search/scroll", url);
}
public void testBuildSearchSizeOneUrl_GivenMultipleIndicesAndTypes() {
String url = ElasticsearchUrlBuilder.create(BASE_URL, TWO_INDEXES, TWO_TYPES).buildSearchSizeOneUrl();
String url = ElasticsearchUrlBuilder.create(TWO_INDEXES, TWO_TYPES).buildSearchSizeOneUrl();
assertEquals("http://localhost:9200/index_1,index_2/type_1,type_2/_search?size=1", url);
}
public void testBuildSearchSizeOneUrl_GivenMultipleIndicesAndEmptyTypes() {
String url = ElasticsearchUrlBuilder.create(BASE_URL, TWO_INDEXES, EMPTY_TYPES).buildSearchSizeOneUrl();
String url = ElasticsearchUrlBuilder.create(TWO_INDEXES, EMPTY_TYPES).buildSearchSizeOneUrl();
assertEquals("http://localhost:9200/index_1,index_2/_search?size=1", url);
}
public void testGetBaseUrl_GivenNoEndingSlash() {
String url = ElasticsearchUrlBuilder.create("http://localhost:9200", SINGLE_INDEX, TWO_TYPES).getBaseUrl();
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).getBaseUrl();
assertEquals("http://localhost:9200/", url);
}
public void testGetBaseUrl_GivenEndingSlash() {
String url = ElasticsearchUrlBuilder.create("http://localhost:9200/", SINGLE_INDEX, TWO_TYPES).getBaseUrl();
String url = ElasticsearchUrlBuilder.create(SINGLE_INDEX, TWO_TYPES).getBaseUrl();
assertEquals("http://localhost:9200/", url);
}
}

View File

@ -32,11 +32,8 @@ setup:
"time_format":"yyyy-MM-dd'T'HH:mm:ssX"
},
"scheduler_config": {
"data_source":"ELASTICSEARCH",
"base_url":"http://marple:9202",
"indexes":["farequote"],
"types":["response"],
"retrieve_whole_source":true
"types":["response"]
}
}