Merge branch 'master' into index-lifecycle

This commit is contained in:
Colin Goodheart-Smithe 2018-09-04 10:34:49 +01:00
commit 92ab442aee
No known key found for this signature in database
GPG Key ID: F975E7BDD739B3C7
23 changed files with 1455 additions and 389 deletions

View File

@ -26,14 +26,15 @@ import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.GetBucketsRequest;
import org.elasticsearch.client.ml.GetJobRequest;
import org.elasticsearch.client.ml.GetJobStatsRequest;
import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.client.ml.FlushJobRequest;
import java.io.IOException;
@ -73,6 +74,23 @@ final class MLRequestConverters {
return request;
}
static Request getJobStats(GetJobStatsRequest getJobStatsRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(Strings.collectionToCommaDelimitedString(getJobStatsRequest.getJobIds()))
.addPathPartAsIs("_stats")
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
if (getJobStatsRequest.isAllowNoJobs() != null) {
params.putParam("allow_no_jobs", Boolean.toString(getJobStatsRequest.isAllowNoJobs()));
}
return request;
}
static Request openJob(OpenJobRequest openJobRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
@ -114,6 +132,19 @@ final class MLRequestConverters {
return request;
}
static Request flushJob(FlushJobRequest flushJobRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(flushJobRequest.getJobId())
.addPathPartAsIs("_flush")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(flushJobRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request getBuckets(GetBucketsRequest getBucketsRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
@ -128,33 +159,17 @@ final class MLRequestConverters {
return request;
}
static Request flushJob(FlushJobRequest flushJobRequest) throws IOException {
static Request getOverallBuckets(GetOverallBucketsRequest getOverallBucketsRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(flushJobRequest.getJobId())
.addPathPartAsIs("_flush")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
request.setEntity(createEntity(flushJobRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request getJobStats(GetJobStatsRequest getJobStatsRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(Strings.collectionToCommaDelimitedString(getJobStatsRequest.getJobIds()))
.addPathPartAsIs("_stats")
.build();
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(Strings.collectionToCommaDelimitedString(getOverallBucketsRequest.getJobIds()))
.addPathPartAsIs("results")
.addPathPartAsIs("overall_buckets")
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
if (getJobStatsRequest.isAllowNoJobs() != null) {
params.putParam("allow_no_jobs", Boolean.toString(getJobStatsRequest.isAllowNoJobs()));
}
request.setEntity(createEntity(getOverallBucketsRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

View File

@ -32,6 +32,8 @@ import org.elasticsearch.client.ml.GetBucketsRequest;
import org.elasticsearch.client.ml.GetBucketsResponse;
import org.elasticsearch.client.ml.GetJobRequest;
import org.elasticsearch.client.ml.GetJobResponse;
import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetOverallBucketsResponse;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.GetRecordsResponse;
import org.elasticsearch.client.ml.OpenJobRequest;
@ -136,6 +138,47 @@ public final class MachineLearningClient {
Collections.emptySet());
}
/**
* Gets usage statistics for one or more Machine Learning jobs
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html">Get Job stats docs</a>
* </p>
* @param request {@link GetJobStatsRequest} Request containing a list of jobId(s) and additional options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return {@link GetJobStatsResponse} response object containing
* the {@link JobStats} objects and the number of jobs found
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public GetJobStatsResponse getJobStats(GetJobStatsRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::getJobStats,
options,
GetJobStatsResponse::fromXContent,
Collections.emptySet());
}
/**
* Gets one or more Machine Learning job configuration info, asynchronously.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html">Get Job stats docs</a>
* </p>
* @param request {@link GetJobStatsRequest} Request containing a list of jobId(s) and additional options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified with {@link GetJobStatsResponse} upon request completion
*/
public void getJobStatsAsync(GetJobStatsRequest request, RequestOptions options, ActionListener<GetJobStatsResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::getJobStats,
options,
GetJobStatsResponse::fromXContent,
listener,
Collections.emptySet());
}
/**
* Deletes the given Machine Learning Job
* <p>
@ -257,6 +300,60 @@ public final class MachineLearningClient {
Collections.emptySet());
}
/**
* Flushes internally buffered data for the given Machine Learning Job ensuring all data sent to the has been processed.
* This may cause new results to be calculated depending on the contents of the buffer
*
* Both flush and close operations are similar,
* however the flush is more efficient if you are expecting to send more data for analysis.
*
* When flushing, the job remains open and is available to continue analyzing data.
* A close operation additionally prunes and persists the model state to disk and the
* job must be opened again before analyzing further data.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
*
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
*/
public FlushJobResponse flushJob(FlushJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::flushJob,
options,
FlushJobResponse::fromXContent,
Collections.emptySet());
}
/**
* Flushes internally buffered data for the given Machine Learning Job asynchronously ensuring all data sent to the has been processed.
* This may cause new results to be calculated depending on the contents of the buffer
*
* Both flush and close operations are similar,
* however the flush is more efficient if you are expecting to send more data for analysis.
*
* When flushing, the job remains open and is available to continue analyzing data.
* A close operation additionally prunes and persists the model state to disk and the
* job must be opened again before analyzing further data.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
*
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void flushJobAsync(FlushJobRequest request, RequestOptions options, ActionListener<FlushJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::flushJob,
options,
FlushJobResponse::fromXContent,
listener,
Collections.emptySet());
}
/**
* Gets the buckets for a Machine Learning Job.
* <p>
@ -294,98 +391,42 @@ public final class MachineLearningClient {
}
/**
* Flushes internally buffered data for the given Machine Learning Job ensuring all data sent to the has been processed.
* This may cause new results to be calculated depending on the contents of the buffer
*
* Both flush and close operations are similar,
* however the flush is more efficient if you are expecting to send more data for analysis.
*
* When flushing, the job remains open and is available to continue analyzing data.
* A close operation additionally prunes and persists the model state to disk and the
* job must be opened again before analyzing further data.
*
* Gets overall buckets for a set of Machine Learning Jobs.
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-overall-buckets.html">
* ML GET overall buckets documentation</a>
*
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
* @param request The request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
*/
public FlushJobResponse flushJob(FlushJobRequest request, RequestOptions options) throws IOException {
public GetOverallBucketsResponse getOverallBuckets(GetOverallBucketsRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::flushJob,
options,
FlushJobResponse::fromXContent,
Collections.emptySet());
}
/**
* Flushes internally buffered data for the given Machine Learning Job asynchronously ensuring all data sent to the has been processed.
* This may cause new results to be calculated depending on the contents of the buffer
*
* Both flush and close operations are similar,
* however the flush is more efficient if you are expecting to send more data for analysis.
*
* When flushing, the job remains open and is available to continue analyzing data.
* A close operation additionally prunes and persists the model state to disk and the
* job must be opened again before analyzing further data.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-flush-job.html">Flush ML job documentation</a>
*
* @param request The {@link FlushJobRequest} object enclosing the `jobId` and additional request options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void flushJobAsync(FlushJobRequest request, RequestOptions options, ActionListener<FlushJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::flushJob,
options,
FlushJobResponse::fromXContent,
listener,
Collections.emptySet());
}
/**
* Gets usage statistics for one or more Machine Learning jobs
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html">Get Job stats docs</a>
* </p>
* @param request {@link GetJobStatsRequest} Request containing a list of jobId(s) and additional options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return {@link GetJobStatsResponse} response object containing
* the {@link JobStats} objects and the number of jobs found
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public GetJobStatsResponse getJobStats(GetJobStatsRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::getJobStats,
options,
GetJobStatsResponse::fromXContent,
Collections.emptySet());
MLRequestConverters::getOverallBuckets,
options,
GetOverallBucketsResponse::fromXContent,
Collections.emptySet());
}
/**
* Gets one or more Machine Learning job configuration info, asynchronously.
*
* Gets overall buckets for a set of Machine Learning Jobs, notifies listener once the requested buckets are retrieved.
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html">Get Job stats docs</a>
* </p>
* @param request {@link GetJobStatsRequest} Request containing a list of jobId(s) and additional options
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-overall-buckets.html">
* ML GET overall buckets documentation</a>
*
* @param request The request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified with {@link GetJobStatsResponse} upon request completion
* @param listener Listener to be notified upon request completion
*/
public void getJobStatsAsync(GetJobStatsRequest request, RequestOptions options, ActionListener<GetJobStatsResponse> listener) {
public void getOverallBucketsAsync(GetOverallBucketsRequest request, RequestOptions options,
ActionListener<GetOverallBucketsResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::getJobStats,
options,
GetJobStatsResponse::fromXContent,
listener,
Collections.emptySet());
MLRequestConverters::getOverallBuckets,
options,
GetOverallBucketsResponse::fromXContent,
listener,
Collections.emptySet());
}
/**

View File

@ -0,0 +1,266 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* A request to retrieve overall buckets of set of jobs
*/
public class GetOverallBucketsRequest extends ActionRequest implements ToXContentObject {
public static final ParseField TOP_N = new ParseField("top_n");
public static final ParseField BUCKET_SPAN = new ParseField("bucket_span");
public static final ParseField OVERALL_SCORE = new ParseField("overall_score");
public static final ParseField EXCLUDE_INTERIM = new ParseField("exclude_interim");
public static final ParseField START = new ParseField("start");
public static final ParseField END = new ParseField("end");
public static final ParseField ALLOW_NO_JOBS = new ParseField("allow_no_jobs");
private static final String ALL_JOBS = "_all";
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<GetOverallBucketsRequest, Void> PARSER = new ConstructingObjectParser<>(
"get_overall_buckets_request", a -> new GetOverallBucketsRequest((String) a[0]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareInt(GetOverallBucketsRequest::setTopN, TOP_N);
PARSER.declareString(GetOverallBucketsRequest::setBucketSpan, BUCKET_SPAN);
PARSER.declareBoolean(GetOverallBucketsRequest::setExcludeInterim, EXCLUDE_INTERIM);
PARSER.declareDouble(GetOverallBucketsRequest::setOverallScore, OVERALL_SCORE);
PARSER.declareStringOrNull(GetOverallBucketsRequest::setStart, START);
PARSER.declareStringOrNull(GetOverallBucketsRequest::setEnd, END);
PARSER.declareBoolean(GetOverallBucketsRequest::setAllowNoJobs, ALLOW_NO_JOBS);
}
private final List<String> jobIds;
private Integer topN;
private TimeValue bucketSpan;
private Boolean excludeInterim;
private Double overallScore;
private String start;
private String end;
private Boolean allowNoJobs;
private GetOverallBucketsRequest(String jobId) {
this(Strings.tokenizeToStringArray(jobId, ","));
}
/**
* Constructs a request to retrieve overall buckets for a set of jobs
* @param jobIds The job identifiers. Each can be a job identifier, a group name, or a wildcard expression.
*/
public GetOverallBucketsRequest(String... jobIds) {
this(Arrays.asList(jobIds));
}
/**
* Constructs a request to retrieve overall buckets for a set of jobs
* @param jobIds The job identifiers. Each can be a job identifier, a group name, or a wildcard expression.
*/
public GetOverallBucketsRequest(List<String> jobIds) {
if (jobIds.stream().anyMatch(Objects::isNull)) {
throw new NullPointerException("jobIds must not contain null values");
}
if (jobIds.isEmpty()) {
this.jobIds = Collections.singletonList(ALL_JOBS);
} else {
this.jobIds = Collections.unmodifiableList(jobIds);
}
}
public List<String> getJobIds() {
return jobIds;
}
public Integer getTopN() {
return topN;
}
/**
* Sets the value of `top_n`.
* @param topN The number of top job bucket scores to be used in the overall_score calculation. Defaults to 1.
*/
public void setTopN(Integer topN) {
this.topN = topN;
}
public TimeValue getBucketSpan() {
return bucketSpan;
}
/**
* Sets the value of `bucket_span`.
* @param bucketSpan The span of the overall buckets. Must be greater or equal to the largest jobs bucket_span.
* Defaults to the largest jobs bucket_span.
*/
public void setBucketSpan(TimeValue bucketSpan) {
this.bucketSpan = bucketSpan;
}
private void setBucketSpan(String bucketSpan) {
this.bucketSpan = TimeValue.parseTimeValue(bucketSpan, BUCKET_SPAN.getPreferredName());
}
public boolean isExcludeInterim() {
return excludeInterim;
}
/**
* Sets the value of "exclude_interim".
* When {@code true}, interim overall buckets will be filtered out.
* Overall buckets are interim if any of the job buckets within the overall bucket interval are interim.
* @param excludeInterim value of "exclude_interim" to be set
*/
public void setExcludeInterim(Boolean excludeInterim) {
this.excludeInterim = excludeInterim;
}
public String getStart() {
return start;
}
/**
* Sets the value of "start" which is a timestamp.
* Only overall buckets whose timestamp is on or after the "start" value will be returned.
* @param start value of "start" to be set
*/
public void setStart(String start) {
this.start = start;
}
public String getEnd() {
return end;
}
/**
* Sets the value of "end" which is a timestamp.
* Only overall buckets whose timestamp is before the "end" value will be returned.
* @param end value of "end" to be set
*/
public void setEnd(String end) {
this.end = end;
}
public Double getOverallScore() {
return overallScore;
}
/**
* Sets the value of "overall_score".
* Only buckets with "overall_score" equal or greater will be returned.
* @param overallScore value of "anomaly_score".
*/
public void setOverallScore(double overallScore) {
this.overallScore = overallScore;
}
/**
* See {@link GetJobRequest#isAllowNoJobs()}
* @param allowNoJobs
*/
public void setAllowNoJobs(boolean allowNoJobs) {
this.allowNoJobs = allowNoJobs;
}
/**
* Whether to ignore if a wildcard expression matches no jobs.
*
* If this is `false`, then an error is returned when a wildcard (or `_all`) does not match any jobs
*/
public Boolean isAllowNoJobs() {
return allowNoJobs;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (jobIds.isEmpty() == false) {
builder.field(Job.ID.getPreferredName(), Strings.collectionToCommaDelimitedString(jobIds));
}
if (topN != null) {
builder.field(TOP_N.getPreferredName(), topN);
}
if (bucketSpan != null) {
builder.field(BUCKET_SPAN.getPreferredName(), bucketSpan.getStringRep());
}
if (excludeInterim != null) {
builder.field(EXCLUDE_INTERIM.getPreferredName(), excludeInterim);
}
if (start != null) {
builder.field(START.getPreferredName(), start);
}
if (end != null) {
builder.field(END.getPreferredName(), end);
}
if (overallScore != null) {
builder.field(OVERALL_SCORE.getPreferredName(), overallScore);
}
if (allowNoJobs != null) {
builder.field(ALLOW_NO_JOBS.getPreferredName(), allowNoJobs);
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(jobIds, topN, bucketSpan, excludeInterim, overallScore, start, end, allowNoJobs);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
GetOverallBucketsRequest other = (GetOverallBucketsRequest) obj;
return Objects.equals(jobIds, other.jobIds) &&
Objects.equals(topN, other.topN) &&
Objects.equals(bucketSpan, other.bucketSpan) &&
Objects.equals(excludeInterim, other.excludeInterim) &&
Objects.equals(overallScore, other.overallScore) &&
Objects.equals(start, other.start) &&
Objects.equals(end, other.end) &&
Objects.equals(allowNoJobs, other.allowNoJobs);
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.client.ml.job.results.OverallBucket;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
/**
* A response containing the requested overall buckets
*/
public class GetOverallBucketsResponse extends AbstractResultResponse<OverallBucket> {
public static final ParseField OVERALL_BUCKETS = new ParseField("overall_buckets");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<GetOverallBucketsResponse, Void> PARSER = new ConstructingObjectParser<>(
"get_overall_buckets_response", true, a -> new GetOverallBucketsResponse((List<OverallBucket>) a[0], (long) a[1]));
static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), OverallBucket.PARSER, OVERALL_BUCKETS);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), COUNT);
}
public static GetOverallBucketsResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
GetOverallBucketsResponse(List<OverallBucket> overallBuckets, long count) {
super(OVERALL_BUCKETS, overallBuckets, count);
}
/**
* The retrieved overall buckets
* @return the retrieved overall buckets
*/
public List<OverallBucket> overallBuckets() {
return results;
}
@Override
public int hashCode() {
return Objects.hash(count, results);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
GetOverallBucketsResponse other = (GetOverallBucketsResponse) obj;
return count == other.count && Objects.equals(results, other.results);
}
}

View File

@ -25,8 +25,12 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.GetBucketsRequest;
import org.elasticsearch.client.ml.GetJobRequest;
import org.elasticsearch.client.ml.GetJobStatsRequest;
import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
@ -36,8 +40,6 @@ import org.elasticsearch.client.ml.job.util.PageParams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.GetJobStatsRequest;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
@ -79,6 +81,24 @@ public class MLRequestConvertersTests extends ESTestCase {
assertEquals(Boolean.toString(true), request.getParameters().get("allow_no_jobs"));
}
public void testGetJobStats() {
GetJobStatsRequest getJobStatsRequestRequest = new GetJobStatsRequest();
Request request = MLRequestConverters.getJobStats(getJobStatsRequestRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/_stats", request.getEndpoint());
assertFalse(request.getParameters().containsKey("allow_no_jobs"));
getJobStatsRequestRequest = new GetJobStatsRequest("job1", "jobs*");
getJobStatsRequestRequest.setAllowNoJobs(true);
request = MLRequestConverters.getJobStats(getJobStatsRequestRequest);
assertEquals("/_xpack/ml/anomaly_detectors/job1,jobs*/_stats", request.getEndpoint());
assertEquals(Boolean.toString(true), request.getParameters().get("allow_no_jobs"));
}
public void testOpenJob() throws Exception {
String jobId = "some-job-id";
OpenJobRequest openJobRequest = new OpenJobRequest(jobId);
@ -124,6 +144,27 @@ public class MLRequestConvertersTests extends ESTestCase {
assertEquals(Boolean.toString(true), request.getParameters().get("force"));
}
public void testFlushJob() throws Exception {
String jobId = randomAlphaOfLength(10);
FlushJobRequest flushJobRequest = new FlushJobRequest(jobId);
Request request = MLRequestConverters.flushJob(flushJobRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/_flush", request.getEndpoint());
assertEquals("{\"job_id\":\"" + jobId + "\"}", requestEntityToString(request));
flushJobRequest.setSkipTime("1000");
flushJobRequest.setStart("105");
flushJobRequest.setEnd("200");
flushJobRequest.setAdvanceTime("100");
flushJobRequest.setCalcInterim(true);
request = MLRequestConverters.flushJob(flushJobRequest);
assertEquals(
"{\"job_id\":\"" + jobId + "\",\"calc_interim\":true,\"start\":\"105\"," +
"\"end\":\"200\",\"advance_time\":\"100\",\"skip_time\":\"1000\"}",
requestEntityToString(request));
}
public void testGetBuckets() throws IOException {
String jobId = randomAlphaOfLength(10);
GetBucketsRequest getBucketsRequest = new GetBucketsRequest(jobId);
@ -141,42 +182,42 @@ public class MLRequestConvertersTests extends ESTestCase {
}
}
public void testFlushJob() throws Exception {
public void testGetOverallBuckets() throws IOException {
String jobId = randomAlphaOfLength(10);
FlushJobRequest flushJobRequest = new FlushJobRequest(jobId);
GetOverallBucketsRequest getOverallBucketsRequest = new GetOverallBucketsRequest(jobId);
getOverallBucketsRequest.setBucketSpan(TimeValue.timeValueHours(3));
getOverallBucketsRequest.setTopN(3);
getOverallBucketsRequest.setStart("2018-08-08T00:00:00Z");
getOverallBucketsRequest.setEnd("2018-09-08T00:00:00Z");
getOverallBucketsRequest.setExcludeInterim(true);
Request request = MLRequestConverters.flushJob(flushJobRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/_flush", request.getEndpoint());
assertEquals("{\"job_id\":\"" + jobId + "\"}", requestEntityToString(request));
flushJobRequest.setSkipTime("1000");
flushJobRequest.setStart("105");
flushJobRequest.setEnd("200");
flushJobRequest.setAdvanceTime("100");
flushJobRequest.setCalcInterim(true);
request = MLRequestConverters.flushJob(flushJobRequest);
assertEquals(
"{\"job_id\":\"" + jobId + "\",\"calc_interim\":true,\"start\":\"105\"," +
"\"end\":\"200\",\"advance_time\":\"100\",\"skip_time\":\"1000\"}",
requestEntityToString(request));
Request request = MLRequestConverters.getOverallBuckets(getOverallBucketsRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/results/overall_buckets", request.getEndpoint());
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
GetOverallBucketsRequest parsedRequest = GetOverallBucketsRequest.PARSER.apply(parser, null);
assertThat(parsedRequest, equalTo(getOverallBucketsRequest));
}
}
public void testGetJobStats() {
GetJobStatsRequest getJobStatsRequestRequest = new GetJobStatsRequest();
Request request = MLRequestConverters.getJobStats(getJobStatsRequestRequest);
public void testGetRecords() throws IOException {
String jobId = randomAlphaOfLength(10);
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(jobId);
getRecordsRequest.setStart("2018-08-08T00:00:00Z");
getRecordsRequest.setEnd("2018-09-08T00:00:00Z");
getRecordsRequest.setPageParams(new PageParams(100, 300));
getRecordsRequest.setRecordScore(75.0);
getRecordsRequest.setSort("anomaly_score");
getRecordsRequest.setDescending(true);
getRecordsRequest.setExcludeInterim(true);
Request request = MLRequestConverters.getRecords(getRecordsRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/_stats", request.getEndpoint());
assertFalse(request.getParameters().containsKey("allow_no_jobs"));
getJobStatsRequestRequest = new GetJobStatsRequest("job1", "jobs*");
getJobStatsRequestRequest.setAllowNoJobs(true);
request = MLRequestConverters.getJobStats(getJobStatsRequestRequest);
assertEquals("/_xpack/ml/anomaly_detectors/job1,jobs*/_stats", request.getEndpoint());
assertEquals(Boolean.toString(true), request.getParameters().get("allow_no_jobs"));
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/results/records", request.getEndpoint());
try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) {
GetRecordsRequest parsedRequest = GetRecordsRequest.PARSER.apply(parser, null);
assertThat(parsedRequest, equalTo(getRecordsRequest));
}
}
private static Job createValidJob(String jobId) {

View File

@ -23,19 +23,29 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.ml.GetBucketsRequest;
import org.elasticsearch.client.ml.GetBucketsResponse;
import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetOverallBucketsResponse;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.GetRecordsResponse;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.DataDescription;
import org.elasticsearch.client.ml.job.config.Detector;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.client.ml.job.results.AnomalyRecord;
import org.elasticsearch.client.ml.job.results.Bucket;
import org.elasticsearch.client.ml.job.results.OverallBucket;
import org.elasticsearch.client.ml.job.util.PageParams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -59,7 +69,7 @@ public class MachineLearningGetResultsIT extends ESRestHighLevelClientTestCase {
@Before
public void createJobAndIndexResults() throws IOException {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
Job job = MachineLearningIT.buildJob(JOB_ID);
Job job = buildJob(JOB_ID);
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);
BulkRequest bulkRequest = new BulkRequest();
@ -206,6 +216,111 @@ public class MachineLearningGetResultsIT extends ESRestHighLevelClientTestCase {
}
}
public void testGetOverallBuckets() throws IOException {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
GetBucketsRequest getBucketsRequest = new GetBucketsRequest(JOB_ID);
getBucketsRequest.setPageParams(new PageParams(0, 3));
List<Bucket> firstBuckets = machineLearningClient.getBuckets(getBucketsRequest, RequestOptions.DEFAULT).buckets();
String anotherJobId = "test-get-overall-buckets-job";
Job anotherJob = buildJob(anotherJobId);
machineLearningClient.putJob(new PutJobRequest(anotherJob), RequestOptions.DEFAULT);
// Let's index matching buckets with the score being 10.0 higher
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (Bucket bucket : firstBuckets) {
IndexRequest indexRequest = new IndexRequest(RESULTS_INDEX, DOC);
indexRequest.source("{\"job_id\":\"" + anotherJobId + "\", \"result_type\":\"bucket\", \"timestamp\": " +
bucket.getTimestamp().getTime() + "," + "\"bucket_span\": 3600,\"is_interim\": " + bucket.isInterim()
+ ", \"anomaly_score\": " + String.valueOf(bucket.getAnomalyScore() + 10.0) + "}", XContentType.JSON);
bulkRequest.add(indexRequest);
}
highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
{
GetOverallBucketsRequest request = new GetOverallBucketsRequest(JOB_ID, anotherJobId);
GetOverallBucketsResponse response = execute(request, machineLearningClient::getOverallBuckets,
machineLearningClient::getOverallBucketsAsync);
assertThat(response.count(), equalTo(241L));
List<OverallBucket> overallBuckets = response.overallBuckets();
assertThat(overallBuckets.size(), equalTo(241));
assertThat(overallBuckets.stream().allMatch(b -> b.getBucketSpan() == 3600L), is(true));
assertThat(overallBuckets.get(0).getTimestamp().getTime(), equalTo(START_TIME_EPOCH_MS));
assertThat(overallBuckets.get(240).isInterim(), is(true));
}
{
GetOverallBucketsRequest request = new GetOverallBucketsRequest(JOB_ID, anotherJobId);
request.setBucketSpan(TimeValue.timeValueHours(2));
GetOverallBucketsResponse response = execute(request, machineLearningClient::getOverallBuckets,
machineLearningClient::getOverallBucketsAsync);
assertThat(response.count(), equalTo(121L));
}
{
long end = START_TIME_EPOCH_MS + 10 * 3600000L;
GetOverallBucketsRequest request = new GetOverallBucketsRequest(JOB_ID, anotherJobId);
request.setEnd(String.valueOf(end));
GetOverallBucketsResponse response = execute(request, machineLearningClient::getOverallBuckets,
machineLearningClient::getOverallBucketsAsync);
assertThat(response.count(), equalTo(10L));
assertThat(response.overallBuckets().get(0).getTimestamp().getTime(), equalTo(START_TIME_EPOCH_MS));
assertThat(response.overallBuckets().get(9).getTimestamp().getTime(), equalTo(end - 3600000L));
}
{
GetOverallBucketsRequest request = new GetOverallBucketsRequest(JOB_ID, anotherJobId);
request.setExcludeInterim(true);
GetOverallBucketsResponse response = execute(request, machineLearningClient::getOverallBuckets,
machineLearningClient::getOverallBucketsAsync);
assertThat(response.count(), equalTo(240L));
assertThat(response.overallBuckets().stream().allMatch(b -> b.isInterim() == false), is(true));
}
{
GetOverallBucketsRequest request = new GetOverallBucketsRequest(JOB_ID);
request.setOverallScore(75.0);
GetOverallBucketsResponse response = execute(request, machineLearningClient::getOverallBuckets,
machineLearningClient::getOverallBucketsAsync);
assertThat(response.count(), equalTo(bucketStats.criticalCount));
assertThat(response.overallBuckets().stream().allMatch(b -> b.getOverallScore() >= 75.0), is(true));
}
{
long start = START_TIME_EPOCH_MS + 10 * 3600000L;
GetOverallBucketsRequest request = new GetOverallBucketsRequest(JOB_ID, anotherJobId);
request.setStart(String.valueOf(start));
GetOverallBucketsResponse response = execute(request, machineLearningClient::getOverallBuckets,
machineLearningClient::getOverallBucketsAsync);
assertThat(response.count(), equalTo(231L));
assertThat(response.overallBuckets().get(0).getTimestamp().getTime(), equalTo(start));
}
{
GetOverallBucketsRequest request = new GetOverallBucketsRequest(JOB_ID, anotherJobId);
request.setEnd(String.valueOf(START_TIME_EPOCH_MS + 3 * 3600000L));
request.setTopN(2);
GetOverallBucketsResponse response = execute(request, machineLearningClient::getOverallBuckets,
machineLearningClient::getOverallBucketsAsync);
assertThat(response.count(), equalTo(3L));
List<OverallBucket> overallBuckets = response.overallBuckets();
for (int i = 0; i < overallBuckets.size(); ++i) {
// As the second job has scores that are -10 from the first, the overall buckets should be +5 from the initial job
assertThat(overallBuckets.get(i).getOverallScore(), is(closeTo(firstBuckets.get(i).getAnomalyScore() + 5.0, 0.0001)));
}
}
}
public void testGetRecords() throws IOException {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
@ -272,6 +387,19 @@ public class MachineLearningGetResultsIT extends ESRestHighLevelClientTestCase {
}
}
public static Job buildJob(String jobId) {
Job.Builder builder = new Job.Builder(jobId);
Detector detector = new Detector.Builder("count", null).build();
AnalysisConfig.Builder configBuilder = new AnalysisConfig.Builder(Arrays.asList(detector));
configBuilder.setBucketSpan(TimeValue.timeValueHours(1));
builder.setAnalysisConfig(configBuilder);
DataDescription.Builder dataDescription = new DataDescription.Builder();
builder.setDataDescription(dataDescription);
return builder.build();
}
private static class Stats {
// score < 50.0
private long minorCount;

View File

@ -20,9 +20,11 @@ package org.elasticsearch.client.documentation;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.MachineLearningGetResultsIT;
import org.elasticsearch.client.MachineLearningIT;
import org.elasticsearch.client.MlRestTestStateCleaner;
import org.elasticsearch.client.RequestOptions;
@ -31,12 +33,16 @@ import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.CloseJobResponse;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.GetBucketsRequest;
import org.elasticsearch.client.ml.GetBucketsResponse;
import org.elasticsearch.client.ml.GetJobRequest;
import org.elasticsearch.client.ml.GetJobResponse;
import org.elasticsearch.client.ml.GetJobStatsRequest;
import org.elasticsearch.client.ml.GetJobStatsResponse;
import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetOverallBucketsResponse;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.GetRecordsResponse;
import org.elasticsearch.client.ml.OpenJobRequest;
@ -49,12 +55,11 @@ import org.elasticsearch.client.ml.job.config.Detector;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.client.ml.job.results.AnomalyRecord;
import org.elasticsearch.client.ml.job.results.Bucket;
import org.elasticsearch.client.ml.job.results.OverallBucket;
import org.elasticsearch.client.ml.job.stats.JobStats;
import org.elasticsearch.client.ml.job.util.PageParams;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.job.stats.JobStats;
import org.junit.After;
import java.io.IOException;
@ -65,9 +70,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.Is.is;
public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
@ -584,6 +591,107 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
public void testGetOverallBuckets() throws IOException, InterruptedException {
RestHighLevelClient client = highLevelClient();
String jobId1 = "test-get-overall-buckets-1";
String jobId2 = "test-get-overall-buckets-2";
Job job1 = MachineLearningGetResultsIT.buildJob(jobId1);
Job job2 = MachineLearningGetResultsIT.buildJob(jobId2);
client.machineLearning().putJob(new PutJobRequest(job1), RequestOptions.DEFAULT);
client.machineLearning().putJob(new PutJobRequest(job2), RequestOptions.DEFAULT);
// Let us index some buckets
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
{
IndexRequest indexRequest = new IndexRequest(".ml-anomalies-shared", "doc");
indexRequest.source("{\"job_id\":\"test-get-overall-buckets-1\", \"result_type\":\"bucket\", \"timestamp\": 1533081600000," +
"\"bucket_span\": 600,\"is_interim\": false, \"anomaly_score\": 60.0}", XContentType.JSON);
bulkRequest.add(indexRequest);
}
{
IndexRequest indexRequest = new IndexRequest(".ml-anomalies-shared", "doc");
indexRequest.source("{\"job_id\":\"test-get-overall-buckets-2\", \"result_type\":\"bucket\", \"timestamp\": 1533081600000," +
"\"bucket_span\": 3600,\"is_interim\": false, \"anomaly_score\": 100.0}", XContentType.JSON);
bulkRequest.add(indexRequest);
}
client.bulk(bulkRequest, RequestOptions.DEFAULT);
{
// tag::x-pack-ml-get-overall-buckets-request
GetOverallBucketsRequest request = new GetOverallBucketsRequest(jobId1, jobId2); // <1>
// end::x-pack-ml-get-overall-buckets-request
// tag::x-pack-ml-get-overall-buckets-bucket-span
request.setBucketSpan(TimeValue.timeValueHours(24)); // <1>
// end::x-pack-ml-get-overall-buckets-bucket-span
// tag::x-pack-ml-get-overall-buckets-end
request.setEnd("2018-08-21T00:00:00Z"); // <1>
// end::x-pack-ml-get-overall-buckets-end
// tag::x-pack-ml-get-overall-buckets-exclude-interim
request.setExcludeInterim(true); // <1>
// end::x-pack-ml-get-overall-buckets-exclude-interim
// tag::x-pack-ml-get-overall-buckets-overall-score
request.setOverallScore(75.0); // <1>
// end::x-pack-ml-get-overall-buckets-overall-score
// tag::x-pack-ml-get-overall-buckets-start
request.setStart("2018-08-01T00:00:00Z"); // <1>
// end::x-pack-ml-get-overall-buckets-start
// tag::x-pack-ml-get-overall-buckets-top-n
request.setTopN(2); // <1>
// end::x-pack-ml-get-overall-buckets-top-n
// tag::x-pack-ml-get-overall-buckets-execute
GetOverallBucketsResponse response = client.machineLearning().getOverallBuckets(request, RequestOptions.DEFAULT);
// end::x-pack-ml-get-overall-buckets-execute
// tag::x-pack-ml-get-overall-buckets-response
long count = response.count(); // <1>
List<OverallBucket> overallBuckets = response.overallBuckets(); // <2>
// end::x-pack-ml-get-overall-buckets-response
assertEquals(1, overallBuckets.size());
assertThat(overallBuckets.get(0).getOverallScore(), is(closeTo(80.0, 0.001)));
}
{
GetOverallBucketsRequest request = new GetOverallBucketsRequest(jobId1, jobId2);
// tag::x-pack-ml-get-overall-buckets-listener
ActionListener<GetOverallBucketsResponse> listener =
new ActionListener<GetOverallBucketsResponse>() {
@Override
public void onResponse(GetOverallBucketsResponse getOverallBucketsResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::x-pack-ml-get-overall-buckets-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::x-pack-ml-get-overall-buckets-execute-async
client.machineLearning().getOverallBucketsAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::x-pack-ml-get-overall-buckets-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testGetRecords() throws IOException, InterruptedException {
RestHighLevelClient client = highLevelClient();

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
public class GetOverallBucketsRequestTests extends AbstractXContentTestCase<GetOverallBucketsRequest> {
@Override
protected GetOverallBucketsRequest createTestInstance() {
GetOverallBucketsRequest request = new GetOverallBucketsRequest(randomAlphaOfLengthBetween(1, 20));
if (randomBoolean()) {
request.setTopN(randomIntBetween(1, 10));
}
if (randomBoolean()) {
request.setBucketSpan(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
}
if (randomBoolean()) {
request.setStart(String.valueOf(randomLong()));
}
if (randomBoolean()) {
request.setEnd(String.valueOf(randomLong()));
}
if (randomBoolean()) {
request.setExcludeInterim(randomBoolean());
}
if (randomBoolean()) {
request.setOverallScore(randomDouble());
}
if (randomBoolean()) {
request.setExcludeInterim(randomBoolean());
}
return request;
}
@Override
protected GetOverallBucketsRequest doParseInstance(XContentParser parser) throws IOException {
return GetOverallBucketsRequest.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.client.ml.job.results.OverallBucket;
import org.elasticsearch.client.ml.job.results.OverallBucketTests;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class GetOverallBucketsResponseTests extends AbstractXContentTestCase<GetOverallBucketsResponse> {
@Override
protected GetOverallBucketsResponse createTestInstance() {
int listSize = randomInt(10);
List<OverallBucket> overallBuckets = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
OverallBucket overallBucket = OverallBucketTests.createRandom();
overallBuckets.add(overallBucket);
}
return new GetOverallBucketsResponse(overallBuckets, listSize);
}
@Override
protected GetOverallBucketsResponse doParseInstance(XContentParser parser) throws IOException {
return GetOverallBucketsResponse.fromXContent(parser);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -32,6 +32,10 @@ public class OverallBucketTests extends AbstractXContentTestCase<OverallBucket>
@Override
protected OverallBucket createTestInstance() {
return createRandom();
}
public static OverallBucket createRandom() {
int jobCount = randomIntBetween(0, 10);
List<OverallBucket.JobInfo> jobs = new ArrayList<>(jobCount);
for (int i = 0; i < jobCount; ++i) {

View File

@ -70,7 +70,7 @@ include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-buckets-s
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-buckets-end]
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-buckets-start]
--------------------------------------------------
<1> Buckets with timestamps on or after this time will be returned.

View File

@ -0,0 +1,107 @@
[[java-rest-high-x-pack-ml-get-overall-buckets]]
=== Get Overall Buckets API
The Get Overall Buckets API retrieves overall bucket results that
summarize the bucket results of multiple jobs.
It accepts a `GetOverallBucketsRequest` object and responds
with a `GetOverallBucketsResponse` object.
[[java-rest-high-x-pack-ml-get-overall-buckets-request]]
==== Get Overall Buckets Request
A `GetOverallBucketsRequest` object gets created with one or more `jobId`.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-request]
--------------------------------------------------
<1> Constructing a new request referencing job IDs `jobId1` and `jobId2`.
==== Optional Arguments
The following arguments are optional:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-bucket-span]
--------------------------------------------------
<1> The span of the overall buckets. Must be greater or equal to the jobs' largest `bucket_span`.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-end]
--------------------------------------------------
<1> Overall buckets with timestamps earlier than this time will be returned.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-exclude-interim]
--------------------------------------------------
<1> If `true`, interim results will be excluded. Overall buckets are interim if any of the job buckets
within the overall bucket interval are interim. Defaults to `false`.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-overall-score]
--------------------------------------------------
<1> Overall buckets with overall scores greater or equal than this value will be returned.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-start]
--------------------------------------------------
<1> Overall buckets with timestamps on or after this time will be returned.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-top-n]
--------------------------------------------------
<1> The number of top job bucket scores to be used in the `overall_score` calculation. Defaults to `1`.
[[java-rest-high-x-pack-ml-get-overall-buckets-execution]]
==== Execution
The request can be executed through the `MachineLearningClient` contained
in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-execute]
--------------------------------------------------
[[java-rest-high-x-pack-ml-get-overall-buckets-execution-async]]
==== Asynchronous Execution
The request can also be executed asynchronously:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-execute-async]
--------------------------------------------------
<1> The `GetOverallBucketsRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back with the `onResponse` method
if the execution is successful or the `onFailure` method if the execution
failed.
A typical listener for `GetBucketsResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-listener]
--------------------------------------------------
<1> `onResponse` is called back when the action is completed successfully
<2> `onFailure` is called back when some unexpected error occurs
[[java-rest-high-snapshot-ml-get-overall-buckets-response]]
==== Get Overall Buckets Response
The returned `GetOverallBucketsResponse` contains the requested buckets:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-get-overall-buckets-response]
--------------------------------------------------
<1> The count of overall buckets that were matched
<2> The overall buckets retrieved

View File

@ -216,6 +216,7 @@ The Java High Level REST Client supports the following Machine Learning APIs:
* <<java-rest-high-x-pack-ml-flush-job>>
* <<java-rest-high-x-pack-ml-get-job-stats>>
* <<java-rest-high-x-pack-ml-get-buckets>>
* <<java-rest-high-x-pack-ml-get-overall-buckets>>
* <<java-rest-high-x-pack-ml-get-records>>
include::ml/put-job.asciidoc[]
@ -226,6 +227,7 @@ include::ml/close-job.asciidoc[]
include::ml/flush-job.asciidoc[]
include::ml/get-job-stats.asciidoc[]
include::ml/get-buckets.asciidoc[]
include::ml/get-overall-buckets.asciidoc[]
include::ml/get-records.asciidoc[]
== Migration APIs

View File

@ -165,12 +165,15 @@ GET index/_search
// TEST[continued]
[float]
=== Mappings
=== Consider mapping identifiers as `keyword`
The fact that some data is numeric does not mean it should always be mapped as a
<<number,numeric field>>. Typically, fields storing identifiers such as an `ISBN`
or any number identifying a record from another database, might benefit from
being mapped as <<keyword,`keyword`>> rather than `integer` or `long`.
<<number,numeric field>>. The way that Elasticsearch indexes numbers optimizes
for `range` queries while `keyword` fields are better at `term` queries. Typically,
fields storing identifiers such as an `ISBN` or any number identifying a record
from another database are rarely used in `range` queries or aggregations. This is
why they might benefit from being mapped as <<keyword,`keyword`>> rather than as
`integer` or `long`.
[float]
=== Avoid scripts
@ -349,15 +352,6 @@ WARNING: Loading data into the filesystem cache eagerly on too many indices or
too many files will make search _slower_ if the filesystem cache is not large
enough to hold all the data. Use with caution.
[float]
=== Map identifiers as `keyword`
When you have numeric identifiers in your documents, it is tempting to map them
as numbers, which is consistent with their json type. However, the way that
Elasticsearch indexes numbers optimizes for `range` queries while `keyword`
fields are better at `term` queries. Since identifiers are never used in `range`
queries, they should be mapped as a `keyword`.
[float]
=== Use index sorting to speed up conjunctions

View File

@ -25,10 +25,12 @@ import java.time.DateTimeException;
import java.time.DayOfWeek;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.time.format.ResolverStyle;
import java.time.format.SignStyle;
import java.time.temporal.ChronoField;
@ -879,11 +881,47 @@ public class DateFormatters {
/*
* Returns a formatter for parsing the milliseconds since the epoch
* This one needs a custom implementation, because the standard date formatter can not parse negative values
* or anything +- 999 milliseconds around the epoch
*
* This implementation just resorts to parsing the input directly to an Instant by trying to parse a number.
*/
private static final CompoundDateTimeFormatter EPOCH_MILLIS = new CompoundDateTimeFormatter(new DateTimeFormatterBuilder()
private static final DateTimeFormatter EPOCH_MILLIS_FORMATTER = new DateTimeFormatterBuilder()
.appendValue(ChronoField.INSTANT_SECONDS, 1, 19, SignStyle.NEVER)
.appendValue(ChronoField.MILLI_OF_SECOND, 3)
.toFormatter(Locale.ROOT));
.toFormatter(Locale.ROOT);
private static final class EpochDateTimeFormatter extends CompoundDateTimeFormatter {
private EpochDateTimeFormatter() {
super(EPOCH_MILLIS_FORMATTER);
}
private EpochDateTimeFormatter(ZoneId zoneId) {
super(EPOCH_MILLIS_FORMATTER.withZone(zoneId));
}
@Override
public TemporalAccessor parse(String input) {
try {
return Instant.ofEpochMilli(Long.valueOf(input)).atZone(ZoneOffset.UTC);
} catch (NumberFormatException e) {
throw new DateTimeParseException("invalid number", input, 0, e);
}
}
@Override
public CompoundDateTimeFormatter withZone(ZoneId zoneId) {
return new EpochDateTimeFormatter(zoneId);
}
@Override
public String format(TemporalAccessor accessor) {
return String.valueOf(Instant.from(accessor).toEpochMilli());
}
}
private static final CompoundDateTimeFormatter EPOCH_MILLIS = new EpochDateTimeFormatter();
/*
* Returns a formatter that combines a full date and two digit hour of

View File

@ -71,7 +71,15 @@ public class JavaJodaTimeDuellingTests extends ESTestCase {
public void testDuellingFormatsValidParsing() {
assertSameDate("1522332219", "epoch_second");
assertSameDate("0", "epoch_second");
assertSameDate("1", "epoch_second");
assertSameDate("-1", "epoch_second");
assertSameDate("-1522332219", "epoch_second");
assertSameDate("1522332219321", "epoch_millis");
assertSameDate("0", "epoch_millis");
assertSameDate("1", "epoch_millis");
assertSameDate("-1", "epoch_millis");
assertSameDate("-1522332219321", "epoch_millis");
assertSameDate("20181126", "basic_date");
assertSameDate("20181126T121212.123Z", "basic_date_time");

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.time;
import org.elasticsearch.test.ESTestCase;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
public class DateFormattersTests extends ESTestCase {
// the epoch milli parser is a bit special, as it does not use date formatter, see comments in DateFormatters
public void testEpochMilliParser() {
CompoundDateTimeFormatter formatter = DateFormatters.forPattern("epoch_millis");
DateTimeParseException e = expectThrows(DateTimeParseException.class, () -> formatter.parse("invalid"));
assertThat(e.getMessage(), containsString("invalid number"));
// different zone, should still yield the same output, as epoch is time zoned independent
ZoneId zoneId = randomZone();
CompoundDateTimeFormatter zonedFormatter = formatter.withZone(zoneId);
assertThat(zonedFormatter.printer.getZone(), is(zoneId));
// test with negative and non negative values
assertThatSameDateTime(formatter, zonedFormatter, randomNonNegativeLong() * -1);
assertThatSameDateTime(formatter, zonedFormatter, randomNonNegativeLong());
assertThatSameDateTime(formatter, zonedFormatter, 0);
assertThatSameDateTime(formatter, zonedFormatter, -1);
assertThatSameDateTime(formatter, zonedFormatter, 1);
// format() output should be equal as well
assertSameFormat(formatter, randomNonNegativeLong() * -1);
assertSameFormat(formatter, randomNonNegativeLong());
assertSameFormat(formatter, 0);
assertSameFormat(formatter, -1);
assertSameFormat(formatter, 1);
}
private void assertThatSameDateTime(CompoundDateTimeFormatter formatter, CompoundDateTimeFormatter zonedFormatter, long millis) {
String millisAsString = String.valueOf(millis);
ZonedDateTime formatterZonedDateTime = DateFormatters.toZonedDateTime(formatter.parse(millisAsString));
ZonedDateTime zonedFormatterZonedDateTime = DateFormatters.toZonedDateTime(zonedFormatter.parse(millisAsString));
assertThat(formatterZonedDateTime.toInstant().toEpochMilli(), is(zonedFormatterZonedDateTime.toInstant().toEpochMilli()));
}
private void assertSameFormat(CompoundDateTimeFormatter formatter, long millis) {
String millisAsString = String.valueOf(millis);
TemporalAccessor accessor = formatter.parse(millisAsString);
assertThat(millisAsString, is(formatter.format(accessor)));
}
}

View File

@ -20,21 +20,25 @@
package org.elasticsearch.indices;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.request.RequestCacheStats;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.joda.time.DateTimeZone;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
@ -107,41 +111,35 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
client.prepareIndex("index", "type", "8").setRouting("3").setSource("s", "2016-03-26"),
client.prepareIndex("index", "type", "9").setRouting("3").setSource("s", "2016-03-27"));
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
ElasticsearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).setPreFilterShardSize(Integer.MAX_VALUE)
.get();
assertSearchResponse(r1);
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).setPreFilterShardSize(Integer.MAX_VALUE).get();
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(5L));
assertCacheState(client, "index", 0, 5);
final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.setPreFilterShardSize(Integer.MAX_VALUE).get();
assertSearchResponse(r2);
ElasticsearchAssertions.assertAllSuccessful(r2);
assertThat(r2.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(3L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(7L));
assertCacheState(client, "index", 3, 7);
final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).setPreFilterShardSize(Integer.MAX_VALUE)
.get();
assertSearchResponse(r3);
ElasticsearchAssertions.assertAllSuccessful(r3);
assertThat(r3.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(6L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(9L));
assertCacheState(client, "index", 6, 9);
}
public void testQueryRewriteMissingValues() throws Exception {
@ -159,38 +157,33 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
client.prepareIndex("index", "type", "8").setSource("s", "2016-03-26"),
client.prepareIndex("index", "type", "9").setSource("s", "2016-03-27"));
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
ElasticsearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-28")).get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(8L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 0, 1);
final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-28")).get();
assertSearchResponse(r2);
ElasticsearchAssertions.assertAllSuccessful(r2);
assertThat(r2.getHits().getTotalHits(), equalTo(8L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 1, 1);
final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-28")).get();
assertSearchResponse(r3);
ElasticsearchAssertions.assertAllSuccessful(r3);
assertThat(r3.getHits().getTotalHits(), equalTo(8L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(2L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 2, 1);
}
public void testQueryRewriteDates() throws Exception {
@ -208,41 +201,36 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
client.prepareIndex("index", "type", "8").setSource("d", "2014-08-01T00:00:00"),
client.prepareIndex("index", "type", "9").setSource("d", "2014-09-01T00:00:00"));
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
ElasticsearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("d").gte("2013-01-01T00:00:00").lte("now"))
.get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(9L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 0, 1);
final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("d").gte("2013-01-01T00:00:00").lte("now"))
.get();
assertSearchResponse(r2);
ElasticsearchAssertions.assertAllSuccessful(r2);
assertThat(r2.getHits().getTotalHits(), equalTo(9L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 1, 1);
final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("d").gte("2013-01-01T00:00:00").lte("now"))
.get();
assertSearchResponse(r3);
ElasticsearchAssertions.assertAllSuccessful(r3);
assertThat(r3.getHits().getTotalHits(), equalTo(9L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(2L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 2, 1);
}
public void testQueryRewriteDatesWithNow() throws Exception {
@ -266,98 +254,47 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
client.prepareIndex("index-3", "type", "8").setSource("d", now.minusDays(7)),
client.prepareIndex("index-3", "type", "9").setSource("d", now.minusDays(8)));
ensureSearchable("index-1", "index-2", "index-3");
assertCacheState(client, "index-1", 0, 0);
assertCacheState(client, "index-2", 0, 0);
assertCacheState(client, "index-3", 0, 0);
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index-1", "index-2", "index-3").setFlush(true)
.get();
ElasticsearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
ensureSearchable("index-1", "index-2", "index-3");
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index-1", 0, 0);
assertCacheState(client, "index-2", 0, 0);
assertCacheState(client, "index-3", 0, 0);
final SearchResponse r1 = client.prepareSearch("index-*").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("d").gte("now-7d/d").lte("now")).get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(8L));
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index-1", 0, 1);
assertCacheState(client, "index-2", 0, 1);
// Because the query will INTERSECT with the 3rd index it will not be
// rewritten and will still contain `now` so won't be recorded as a
// cache miss or cache hit since queries containing now can't be cached
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index-3", 0, 0);
final SearchResponse r2 = client.prepareSearch("index-*").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("d").gte("now-7d/d").lte("now")).get();
assertSearchResponse(r2);
ElasticsearchAssertions.assertAllSuccessful(r2);
assertThat(r2.getHits().getTotalHits(), equalTo(8L));
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(1L));
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(1L));
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index-1", 1, 1);
assertCacheState(client, "index-2", 1, 1);
assertCacheState(client, "index-3", 0, 0);
final SearchResponse r3 = client.prepareSearch("index-*").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("d").gte("now-7d/d").lte("now")).get();
assertSearchResponse(r3);
ElasticsearchAssertions.assertAllSuccessful(r3);
assertThat(r3.getHits().getTotalHits(), equalTo(8L));
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(2L));
assertThat(
client.admin().indices().prepareStats("index-1").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(2L));
assertThat(
client.admin().indices().prepareStats("index-2").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(
client.admin().indices().prepareStats("index-3").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index-1", 2, 1);
assertCacheState(client, "index-2", 2, 1);
assertCacheState(client, "index-3", 0, 0);
}
public void testCanCache() throws Exception {
@ -378,74 +315,60 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
client.prepareIndex("index", "type", "8").setRouting("3").setSource("s", "2016-03-26"),
client.prepareIndex("index", "type", "9").setRouting("3").setSource("s", "2016-03-27"));
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
ElasticsearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
ensureSearchable("index");
assertCacheState(client, "index", 0, 0);
// If size > 0 we should no cache by default
final SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(1)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")).get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index", 0, 0);
// If search type is DFS_QUERY_THEN_FETCH we should not cache
final SearchResponse r2 = client.prepareSearch("index").setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")).get();
assertSearchResponse(r2);
ElasticsearchAssertions.assertAllSuccessful(r2);
assertThat(r2.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index", 0, 0);
// If search type is DFS_QUERY_THEN_FETCH we should not cache even if
// the cache flag is explicitly set on the request
final SearchResponse r3 = client.prepareSearch("index").setSearchType(SearchType.DFS_QUERY_THEN_FETCH).setSize(0)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")).get();
assertSearchResponse(r3);
ElasticsearchAssertions.assertAllSuccessful(r3);
assertThat(r3.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index", 0, 0);
// If the request has an non-filter aggregation containing now we should not cache
final SearchResponse r5 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.addAggregation(dateRange("foo").field("s").addRange("now-10y", "now")).get();
assertSearchResponse(r5);
ElasticsearchAssertions.assertAllSuccessful(r5);
assertThat(r5.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index", 0, 0);
// If size > 1 and cache flag is set on the request we should cache
final SearchResponse r6 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(1)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")).get();
assertSearchResponse(r6);
ElasticsearchAssertions.assertAllSuccessful(r6);
assertThat(r6.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(2L));
assertCacheState(client, "index", 0, 2);
// If the request has a filter aggregation containing now we should cache since it gets rewritten
final SearchResponse r4 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setRequestCache(true).setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26"))
.addAggregation(filter("foo", QueryBuilders.rangeQuery("s").from("now-10y").to("now"))).get();
assertSearchResponse(r4);
ElasticsearchAssertions.assertAllSuccessful(r4);
assertThat(r4.getHits().getTotalHits(), equalTo(7L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(4L));
assertCacheState(client, "index", 0, 4);
}
public void testCacheWithFilteredAlias() {
@ -460,45 +383,42 @@ public class IndicesRequestCacheIT extends ESIntegTestCase {
client.prepareIndex("index", "type", "1").setRouting("1").setSource("created_at",
DateTimeFormatter.ISO_LOCAL_DATE.format(now)).get();
refresh();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
ElasticsearchAssertions.assertAllSuccessful(forceMergeResponse);
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(0L));
assertCacheState(client, "index", 0, 0);
SearchResponse r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("created_at").gte("now-7d/d")).get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(0L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 0, 1);
r1 = client.prepareSearch("index").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0)
.setQuery(QueryBuilders.rangeQuery("created_at").gte("now-7d/d")).get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(1L));
assertCacheState(client, "index", 1, 1);
r1 = client.prepareSearch("last_week").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(2L));
assertCacheState(client, "index", 1, 2);
r1 = client.prepareSearch("last_week").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get();
assertSearchResponse(r1);
ElasticsearchAssertions.assertAllSuccessful(r1);
assertThat(r1.getHits().getTotalHits(), equalTo(1L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getHitCount(),
equalTo(2L));
assertThat(client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMissCount(),
equalTo(2L));
assertCacheState(client, "index", 2, 2);
}
private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin().indices().prepareStats(index).setRequestCache(true).get().getTotal()
.getRequestCache();
// Check the hit count and miss count together so if they are not
// correct we can see both values
assertEquals(Arrays.asList(expectedHits, expectedMisses),
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount()));
}
}

View File

@ -73,14 +73,13 @@ public class StopDatafeedAction extends Action<StopDatafeedAction.Response> {
}
private String datafeedId;
private String[] resolvedStartedDatafeedIds;
private String[] resolvedStartedDatafeedIds = new String[] {};
private TimeValue stopTimeout = DEFAULT_TIMEOUT;
private boolean force = false;
private boolean allowNoDatafeeds = true;
public Request(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
this.resolvedStartedDatafeedIds = new String[] { datafeedId };
}
public Request() {

View File

@ -24,6 +24,9 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
if (randomBoolean()) {
request.setAllowNoDatafeeds(randomBoolean());
}
if (randomBoolean()) {
request.setResolvedStartedDatafeedIds(generateRandomStringArray(4, 8, false));
}
return request;
}

View File

@ -125,6 +125,20 @@ public class DateHistogramGroupConfigSerializingTests extends AbstractSerializin
assertThat(e.validationErrors().size(), equalTo(0));
}
public void testValidateWeek() {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(true);
responseMap.put("my_field", Collections.singletonMap("date", fieldCaps));
DateHistogramGroupConfig config = new DateHistogramGroupConfig("my_field", new DateHistogramInterval("1w"), null, null);
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().size(), equalTo(0));
}
/**
* Tests that a DateHistogramGroupConfig can be serialized/deserialized correctly after
* the timezone was changed from DateTimeZone to String.

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.rollup;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
@ -18,9 +19,7 @@ import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.joda.time.DateTimeZone;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -33,30 +32,7 @@ import java.util.Set;
*/
public class RollupJobIdentifierUtils {
private static final Comparator<RollupJobCaps> COMPARATOR = RollupJobIdentifierUtils.getComparator();
public static final Map<String, Integer> CALENDAR_ORDERING;
static {
Map<String, Integer> dateFieldUnits = new HashMap<>(16);
dateFieldUnits.put("year", 8);
dateFieldUnits.put("1y", 8);
dateFieldUnits.put("quarter", 7);
dateFieldUnits.put("1q", 7);
dateFieldUnits.put("month", 6);
dateFieldUnits.put("1M", 6);
dateFieldUnits.put("week", 5);
dateFieldUnits.put("1w", 5);
dateFieldUnits.put("day", 4);
dateFieldUnits.put("1d", 4);
dateFieldUnits.put("hour", 3);
dateFieldUnits.put("1h", 3);
dateFieldUnits.put("minute", 2);
dateFieldUnits.put("1m", 2);
dateFieldUnits.put("second", 1);
dateFieldUnits.put("1s", 1);
CALENDAR_ORDERING = Collections.unmodifiableMap(dateFieldUnits);
}
static final Comparator<RollupJobCaps> COMPARATOR = RollupJobIdentifierUtils.getComparator();
/**
* Given the aggregation tree and a list of available job capabilities, this method will return a set
@ -176,8 +152,10 @@ public class RollupJobIdentifierUtils {
// The request must be gte the config. The CALENDAR_ORDERING map values are integers representing
// relative orders between the calendar units
int requestOrder = CALENDAR_ORDERING.getOrDefault(requestInterval.toString(), Integer.MAX_VALUE);
int configOrder = CALENDAR_ORDERING.getOrDefault(configInterval.toString(), Integer.MAX_VALUE);
DateTimeUnit requestUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(requestInterval.toString());
long requestOrder = requestUnit.field(DateTimeZone.UTC).getDurationField().getUnitMillis();
DateTimeUnit configUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(configInterval.toString());
long configOrder = configUnit.field(DateTimeZone.UTC).getDurationField().getUnitMillis();
// All calendar units are multiples naturally, so we just care about gte
return requestOrder >= configOrder;
@ -190,7 +168,7 @@ public class RollupJobIdentifierUtils {
return false;
}
// Both are fixed, good to conver to millis now
// Both are fixed, good to convert to millis now
long configIntervalMillis = TimeValue.parseTimeValue(configInterval.toString(),
"date_histo.config.interval").getMillis();
long requestIntervalMillis = TimeValue.parseTimeValue(requestInterval.toString(),
@ -326,8 +304,8 @@ public class RollupJobIdentifierUtils {
return 0;
}
TimeValue thisTime = null;
TimeValue thatTime = null;
long thisTime = Long.MAX_VALUE;
long thatTime = Long.MAX_VALUE;
// histogram intervals are averaged and compared, with the idea that
// a larger average == better, because it will generate fewer documents
@ -344,7 +322,7 @@ public class RollupJobIdentifierUtils {
for (RollupJobCaps.RollupFieldCaps fieldCaps : o1.getFieldCaps().values()) {
for (Map<String, Object> agg : fieldCaps.getAggs()) {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
thisTime = TimeValue.parseTimeValue((String) agg.get(RollupField.INTERVAL), RollupField.INTERVAL);
thisTime = getMillisFixedOrCalendar((String) agg.get(RollupField.INTERVAL));
} else if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) {
thisHistoWeights += (long) agg.get(RollupField.INTERVAL);
counter += 1;
@ -360,7 +338,7 @@ public class RollupJobIdentifierUtils {
for (RollupJobCaps.RollupFieldCaps fieldCaps : o2.getFieldCaps().values()) {
for (Map<String, Object> agg : fieldCaps.getAggs()) {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
thatTime = TimeValue.parseTimeValue((String) agg.get(RollupField.INTERVAL), RollupField.INTERVAL);
thatTime = getMillisFixedOrCalendar((String) agg.get(RollupField.INTERVAL));
} else if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) {
thatHistoWeights += (long) agg.get(RollupField.INTERVAL);
counter += 1;
@ -371,13 +349,9 @@ public class RollupJobIdentifierUtils {
}
thatHistoWeights = counter == 0 ? 0 : thatHistoWeights / counter;
// DateHistos are mandatory so these should always be present no matter what
assert thisTime != null;
assert thatTime != null;
// Compare on date interval first
// The "smaller" job is the one with the larger interval
int timeCompare = thisTime.compareTo(thatTime);
int timeCompare = Long.compare(thisTime, thatTime);
if (timeCompare != 0) {
return -timeCompare;
}
@ -409,4 +383,14 @@ public class RollupJobIdentifierUtils {
// coverage
};
}
static long getMillisFixedOrCalendar(String value) {
DateHistogramInterval interval = new DateHistogramInterval(value);
if (isCalendarInterval(interval)) {
DateTimeUnit intervalUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString());
return intervalUnit.field(DateTimeZone.UTC).getDurationField().getUnitMillis();
} else {
return TimeValue.parseTimeValue(value, "date_histo.comparator.interval").getMillis();
}
}
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
@ -24,17 +25,22 @@ import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.joda.time.DateTimeZone;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class RollupJobIdentifierUtilTests extends ESTestCase {
private static final List<String> UNITS = new ArrayList<>(DateHistogramAggregationBuilder.DATE_FIELD_UNITS.keySet());
public void testOneMatch() {
final GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h")));
final RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
@ -577,6 +583,124 @@ public class RollupJobIdentifierUtilTests extends ESTestCase {
assertFalse(valid);
}
public void testComparatorMixed() {
int numCaps = randomIntBetween(1, 10);
List<RollupJobCaps> caps = new ArrayList<>(numCaps);
for (int i = 0; i < numCaps; i++) {
DateHistogramInterval interval = getRandomInterval();
GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", interval));
RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
RollupJobCaps cap = new RollupJobCaps(job);
caps.add(cap);
}
caps.sort(RollupJobIdentifierUtils.COMPARATOR);
// This only tests for calendar/fixed ordering, ignoring the other criteria
for (int i = 1; i < numCaps; i++) {
RollupJobCaps a = caps.get(i - 1);
RollupJobCaps b = caps.get(i);
long aMillis = getMillis(a);
long bMillis = getMillis(b);
assertThat(aMillis, greaterThanOrEqualTo(bMillis));
}
}
public void testComparatorFixed() {
int numCaps = randomIntBetween(1, 10);
List<RollupJobCaps> caps = new ArrayList<>(numCaps);
for (int i = 0; i < numCaps; i++) {
DateHistogramInterval interval = getRandomFixedInterval();
GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", interval));
RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
RollupJobCaps cap = new RollupJobCaps(job);
caps.add(cap);
}
caps.sort(RollupJobIdentifierUtils.COMPARATOR);
// This only tests for fixed ordering, ignoring the other criteria
for (int i = 1; i < numCaps; i++) {
RollupJobCaps a = caps.get(i - 1);
RollupJobCaps b = caps.get(i);
long aMillis = getMillis(a);
long bMillis = getMillis(b);
assertThat(aMillis, greaterThanOrEqualTo(bMillis));
}
}
public void testComparatorCalendar() {
int numCaps = randomIntBetween(1, 10);
List<RollupJobCaps> caps = new ArrayList<>(numCaps);
for (int i = 0; i < numCaps; i++) {
DateHistogramInterval interval = getRandomCalendarInterval();
GroupConfig group = new GroupConfig(new DateHistogramGroupConfig("foo", interval));
RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
RollupJobCaps cap = new RollupJobCaps(job);
caps.add(cap);
}
caps.sort(RollupJobIdentifierUtils.COMPARATOR);
// This only tests for calendar ordering, ignoring the other criteria
for (int i = 1; i < numCaps; i++) {
RollupJobCaps a = caps.get(i - 1);
RollupJobCaps b = caps.get(i);
long aMillis = getMillis(a);
long bMillis = getMillis(b);
assertThat(aMillis, greaterThanOrEqualTo(bMillis));
}
}
private static long getMillis(RollupJobCaps cap) {
for (RollupJobCaps.RollupFieldCaps fieldCaps : cap.getFieldCaps().values()) {
for (Map<String, Object> agg : fieldCaps.getAggs()) {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
return RollupJobIdentifierUtils.getMillisFixedOrCalendar((String) agg.get(RollupField.INTERVAL));
}
}
}
return Long.MAX_VALUE;
}
private static DateHistogramInterval getRandomInterval() {
if (randomBoolean()) {
return getRandomFixedInterval();
}
return getRandomCalendarInterval();
}
private static DateHistogramInterval getRandomFixedInterval() {
int value = randomIntBetween(1, 1000);
String unit;
int randomValue = randomInt(4);
if (randomValue == 0) {
unit = "ms";
} else if (randomValue == 1) {
unit = "s";
} else if (randomValue == 2) {
unit = "m";
} else if (randomValue == 3) {
unit = "h";
} else {
unit = "d";
}
return new DateHistogramInterval(Integer.toString(value) + unit);
}
private static DateHistogramInterval getRandomCalendarInterval() {
return new DateHistogramInterval(UNITS.get(randomIntBetween(0, UNITS.size()-1)));
}
private Set<RollupJobCaps> singletonSet(RollupJobCaps cap) {
Set<RollupJobCaps> caps = new HashSet<>();
caps.add(cap);