Made client calls non blocking in JobProvider#modelSnapshots(...)

Original commit: elastic/x-pack-elasticsearch@00790a5336
This commit is contained in:
Martijn van Groningen 2017-01-10 08:09:26 +01:00
parent 1d81509616
commit 10d8a52b23
7 changed files with 194 additions and 242 deletions

View File

@ -28,9 +28,9 @@ import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.manager.JobManager; import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages; import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory; import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
@ -148,54 +148,54 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
@Override @Override
protected void doExecute(Request request, ActionListener<Response> listener) { protected void doExecute(Request request, ActionListener<Response> listener) {
// Verify the snapshot exists // Verify the snapshot exists
List<ModelSnapshot> deleteCandidates; jobProvider.modelSnapshots(
deleteCandidates = jobProvider.modelSnapshots( request.getJobId(), 0, 1, null, null, null, true, request.getSnapshotId(), null,
request.getJobId(), 0, 1, null, null, null, true, request.getSnapshotId(), null page -> {
).results(); List<ModelSnapshot> deleteCandidates = page.results();
if (deleteCandidates.size() > 1) {
logger.warn("More than one model found for [job_id: " + request.getJobId()
+ ", snapshot_id: " + request.getSnapshotId() + "] tuple.");
}
if (deleteCandidates.size() > 1) { if (deleteCandidates.isEmpty()) {
logger.warn("More than one model found for [job_id: " + request.getJobId() listener.onFailure(new ResourceNotFoundException(
+ ", snapshot_id: " + request.getSnapshotId() + "] tuple."); Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId())));
} }
ModelSnapshot deleteCandidate = deleteCandidates.get(0);
if (deleteCandidates.isEmpty()) { // Verify the snapshot is not being used
throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId())); //
} // NORELEASE: technically, this could be stale and refuse a delete, but I think that's acceptable
ModelSnapshot deleteCandidate = deleteCandidates.get(0); // since it is non-destructive
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state());
if (job.count() > 0) {
String currentModelInUse = job.results().get(0).getModelSnapshotId();
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) {
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY,
request.getSnapshotId(), request.getJobId()));
}
}
// Verify the snapshot is not being used // Delete the snapshot and any associated state files
// JobDataDeleter deleter = bulkDeleterFactory.apply(request.getJobId());
// NORELEASE: technically, this could be stale and refuse a delete, but I think that's acceptable deleter.deleteModelSnapshot(deleteCandidate);
// since it is non-destructive deleter.commit(new ActionListener<BulkResponse>() {
QueryPage<Job> job = jobManager.getJob(request.getJobId(), clusterService.state()); @Override
if (job.count() > 0) { public void onResponse(BulkResponse bulkResponse) {
String currentModelInUse = job.results().get(0).getModelSnapshotId(); // We don't care about the bulk response, just that it succeeded
if (currentModelInUse != null && currentModelInUse.equals(request.getSnapshotId())) { listener.onResponse(new DeleteModelSnapshotAction.Response(true));
throw new IllegalArgumentException(Messages.getMessage(Messages.REST_CANNOT_DELETE_HIGHEST_PRIORITY, }
request.getSnapshotId(), request.getJobId()));
}
}
// Delete the snapshot and any associated state files @Override
JobDataDeleter deleter = bulkDeleterFactory.apply(request.getJobId()); public void onFailure(Exception e) {
deleter.deleteModelSnapshot(deleteCandidate); listener.onFailure(e);
deleter.commit(new ActionListener<BulkResponse>() { }
@Override });
public void onResponse(BulkResponse bulkResponse) {
// We don't care about the bulk response, just that it succeeded
listener.onResponse(new DeleteModelSnapshotAction.Response(true));
}
@Override jobManager.audit(request.getJobId()).info(Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
public void onFailure(Exception e) { deleteCandidate.getDescription()));
listener.onFailure(e); }, listener::onFailure);
}
});
jobManager.audit(request.getJobId()).info(Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
deleteCandidate.getDescription()));
} }
} }
} }

View File

@ -326,26 +326,20 @@ extends Action<GetModelSnapshotsAction.Request, GetModelSnapshotsAction.Response
request.getJobId(), request.pageParams.getFrom(), request.pageParams.getSize(), request.getStart(), request.getEnd(), request.getJobId(), request.pageParams.getFrom(), request.pageParams.getSize(), request.getStart(), request.getEnd(),
request.getSort(), request.getDescOrder(), request.getDescriptionString())); request.getSort(), request.getDescOrder(), request.getDescriptionString()));
QueryPage<ModelSnapshot> page = doGetPage(jobProvider, request); jobProvider.modelSnapshots(request.getJobId(), request.pageParams.getFrom(), request.pageParams.getSize(),
request.getStart(), request.getEnd(), request.getSort(), request.getDescOrder(), null, request.getDescriptionString(),
logger.debug(String.format(Locale.ROOT, "Return %d model snapshots for job %s", page.count(), request.getJobId())); page -> {
listener.onResponse(new Response(page)); clearQuantiles(page);
listener.onResponse(new Response(page));
}, listener::onFailure);
} }
public static QueryPage<ModelSnapshot> doGetPage(JobProvider jobProvider, Request request) { public static void clearQuantiles(QueryPage<ModelSnapshot> page) {
QueryPage<ModelSnapshot> page = jobProvider.modelSnapshots(request.getJobId(), request.pageParams.getFrom(),
request.pageParams.getSize(), request.getStart(), request.getEnd(), request.getSort(), request.getDescOrder(), null,
request.getDescriptionString());
// The quantiles can be large, and totally dominate the output -
// it's
// clearer to remove them
if (page.results() != null) { if (page.results() != null) {
for (ModelSnapshot modelSnapshot : page.results()) { for (ModelSnapshot modelSnapshot : page.results()) {
modelSnapshot.setQuantiles(null); modelSnapshot.setQuantiles(null);
} }
} }
return page;
} }
} }

View File

@ -53,6 +53,7 @@ import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.function.Consumer;
import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -353,30 +354,35 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)); throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
} }
ModelSnapshot modelSnapshot = getModelSnapshot(request, jobProvider); getModelSnapshot(request, jobProvider, modelSnapshot -> {
if (request.getDeleteInterveningResults()) { ActionListener<Response> wrappedListener = listener;
listener = wrapDeleteOldDataListener(listener, modelSnapshot, request.getJobId()); if (request.getDeleteInterveningResults()) {
listener = wrapRevertDataCountsListener(listener, modelSnapshot, request.getJobId()); wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
} wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
jobManager.revertSnapshot(request, listener, modelSnapshot); }
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
}, listener::onFailure);
} }
private ModelSnapshot getModelSnapshot(Request request, JobProvider provider) { private void getModelSnapshot(Request request, JobProvider provider, Consumer<ModelSnapshot> handler,
Consumer<Exception> errorHandler) {
logger.info("Reverting to snapshot '" + request.getSnapshotId() + "' for time '" + request.getTime() + "'"); logger.info("Reverting to snapshot '" + request.getSnapshotId() + "' for time '" + request.getTime() + "'");
List<ModelSnapshot> revertCandidates; provider.modelSnapshots(request.getJobId(), 0, 1, null, request.getTime(),
revertCandidates = provider.modelSnapshots(request.getJobId(), 0, 1, null, request.getTime(), ModelSnapshot.TIMESTAMP.getPreferredName(), true, request.getSnapshotId(), request.getDescription(),
ModelSnapshot.TIMESTAMP.getPreferredName(), true, request.getSnapshotId(), request.getDescription()).results(); page -> {
List<ModelSnapshot> revertCandidates = page.results();
if (revertCandidates == null || revertCandidates.isEmpty()) {
throw new ResourceNotFoundException(
Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId()));
}
ModelSnapshot modelSnapshot = revertCandidates.get(0);
if (revertCandidates == null || revertCandidates.isEmpty()) { // The quantiles can be large, and totally dominate the output -
throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId())); // it's clearer to remove them
} modelSnapshot.setQuantiles(null);
ModelSnapshot modelSnapshot = revertCandidates.get(0); handler.accept(modelSnapshot);
}, errorHandler);
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them
modelSnapshot.setQuantiles(null);
return modelSnapshot;
} }
private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldDataListener( private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldDataListener(

View File

@ -41,6 +41,7 @@ import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.function.Consumer;
public class UpdateModelSnapshotAction extends public class UpdateModelSnapshotAction extends
Action<UpdateModelSnapshotAction.Request, UpdateModelSnapshotAction.Response, Action<UpdateModelSnapshotAction.Request, UpdateModelSnapshotAction.Response,
@ -262,49 +263,56 @@ UpdateModelSnapshotAction.RequestBuilder> {
@Override @Override
protected void doExecute(Request request, ActionListener<Response> listener) { protected void doExecute(Request request, ActionListener<Response> listener) {
logger.debug("Received request to change model snapshot description using '" + request.getDescriptionString() logger.debug("Received request to change model snapshot description using '" + request.getDescriptionString()
+ "' for snapshot ID '" + request.getSnapshotId() + "' for job '" + request.getJobId() + "'"); + "' for snapshot ID '" + request.getSnapshotId() + "' for job '" + request.getJobId() + "'");
getChangeCandidates(request, changeCandidates -> {
checkForClashes(request, aVoid -> {
if (changeCandidates.size() > 1) {
logger.warn("More than one model found for [{}: {}, {}: {}] tuple.", Job.ID.getPreferredName(), request.getJobId(),
ModelSnapshot.SNAPSHOT_ID.getPreferredName(), request.getSnapshotId());
}
ModelSnapshot modelSnapshot = changeCandidates.get(0);
modelSnapshot.setDescription(request.getDescriptionString());
jobManager.updateModelSnapshot(request.getJobId(), modelSnapshot, false);
List<ModelSnapshot> changeCandidates = getChangeCandidates(request); modelSnapshot.setDescription(request.getDescriptionString());
checkForClashes(request);
if (changeCandidates.size() > 1) { // The quantiles can be large, and totally dominate the output -
logger.warn("More than one model found for [{}: {}, {}: {}] tuple.", Job.ID.getPreferredName(), request.getJobId(), // it's clearer to remove them
ModelSnapshot.SNAPSHOT_ID.getPreferredName(), request.getSnapshotId()); modelSnapshot.setQuantiles(null);
}
ModelSnapshot modelSnapshot = changeCandidates.get(0);
modelSnapshot.setDescription(request.getDescriptionString());
jobManager.updateModelSnapshot(request.getJobId(), modelSnapshot, false);
modelSnapshot.setDescription(request.getDescriptionString());
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them
modelSnapshot.setQuantiles(null);
listener.onResponse(new Response(modelSnapshot));
listener.onResponse(new Response(modelSnapshot));
}, listener::onFailure);
}, listener::onFailure);
} }
private List<ModelSnapshot> getChangeCandidates(Request request) { private void getChangeCandidates(Request request, Consumer<List<ModelSnapshot>> handler, Consumer<Exception> errorHandler) {
List<ModelSnapshot> changeCandidates = getModelSnapshots(request.getJobId(), request.getSnapshotId(), null); getModelSnapshots(request.getJobId(), request.getSnapshotId(), null,
if (changeCandidates == null || changeCandidates.isEmpty()) { changeCandidates -> {
throw new ResourceNotFoundException(Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId())); if (changeCandidates == null || changeCandidates.isEmpty()) {
} errorHandler.accept(new ResourceNotFoundException(
return changeCandidates; Messages.getMessage(Messages.REST_NO_SUCH_MODEL_SNAPSHOT, request.getJobId())));
} else {
handler.accept(changeCandidates);
}
}, errorHandler);
} }
private void checkForClashes(Request request) { private void checkForClashes(Request request, Consumer<Void> handler, Consumer<Exception> errorHandler) {
List<ModelSnapshot> clashCandidates = getModelSnapshots(request.getJobId(), null, request.getDescriptionString()); getModelSnapshots(request.getJobId(), null, request.getDescriptionString(), clashCandidates -> {
if (clashCandidates != null && !clashCandidates.isEmpty()) { if (clashCandidates != null && !clashCandidates.isEmpty()) {
throw new IllegalArgumentException(Messages.getMessage( errorHandler.accept(new IllegalArgumentException(Messages.getMessage(
Messages.REST_DESCRIPTION_ALREADY_USED, request.getDescriptionString(), request.getJobId())); Messages.REST_DESCRIPTION_ALREADY_USED, request.getDescriptionString(), request.getJobId())));
} } else {
handler.accept(null);
}
}, errorHandler);
} }
private List<ModelSnapshot> getModelSnapshots(String jobId, String snapshotId, String description) { private void getModelSnapshots(String jobId, String snapshotId, String description,
return jobProvider.modelSnapshots(jobId, 0, 1, null, null, null, true, snapshotId, description).results(); Consumer<List<ModelSnapshot>> handler, Consumer<Exception> errorHandler) {
jobProvider.modelSnapshots(jobId, 0, 1, null, null, null, true, snapshotId, description,
page -> handler.accept(page.results()), errorHandler);
} }
} }

View File

@ -20,8 +20,10 @@ import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -37,7 +39,6 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder;
@ -788,7 +789,9 @@ public class JobProvider {
* @return page of model snapshots * @return page of model snapshots
*/ */
public QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size) { public QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size) {
return modelSnapshots(jobId, from, size, null, null, null, true, null, null); PlainActionFuture<QueryPage<ModelSnapshot>> future = PlainActionFuture.newFuture();
modelSnapshots(jobId, from, size, null, false, QueryBuilders.matchAllQuery(), future);
return future.actionGet();
} }
/** /**
@ -803,21 +806,28 @@ public class JobProvider {
* @param sortDescending Sort in descending order * @param sortDescending Sort in descending order
* @param snapshotId optional snapshot ID to match (null for all) * @param snapshotId optional snapshot ID to match (null for all)
* @param description optional description to match (null for all) * @param description optional description to match (null for all)
* @return page of model snapshots
*/ */
public QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size, public void modelSnapshots(String jobId,
String startEpochMs, String endEpochMs, String sortField, boolean sortDescending, int from,
String snapshotId, String description) { int size,
String startEpochMs,
String endEpochMs,
String sortField,
boolean sortDescending,
String snapshotId,
String description,
CheckedConsumer<QueryPage<ModelSnapshot>, Exception> handler,
Consumer<Exception> errorHandler) {
boolean haveId = snapshotId != null && !snapshotId.isEmpty(); boolean haveId = snapshotId != null && !snapshotId.isEmpty();
boolean haveDescription = description != null && !description.isEmpty(); boolean haveDescription = description != null && !description.isEmpty();
ResultsFilterBuilder fb; ResultsFilterBuilder fb;
if (haveId || haveDescription) { if (haveId || haveDescription) {
BoolQueryBuilder query = QueryBuilders.boolQuery(); BoolQueryBuilder query = QueryBuilders.boolQuery();
if (haveId) { if (haveId) {
query.must(QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), snapshotId)); query.filter(QueryBuilders.termQuery(ModelSnapshot.SNAPSHOT_ID.getPreferredName(), snapshotId));
} }
if (haveDescription) { if (haveDescription) {
query.must(QueryBuilders.termQuery(ModelSnapshot.DESCRIPTION.getPreferredName(), description)); query.filter(QueryBuilders.termQuery(ModelSnapshot.DESCRIPTION.getPreferredName(), description));
} }
fb = new ResultsFilterBuilder(query); fb = new ResultsFilterBuilder(query);
@ -825,54 +835,52 @@ public class JobProvider {
fb = new ResultsFilterBuilder(); fb = new ResultsFilterBuilder();
} }
return modelSnapshots(jobId, from, size, QueryBuilder qb = fb.timeRange(Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs).build();
(sortField == null || sortField.isEmpty()) ? ModelSnapshot.RESTORE_PRIORITY.getPreferredName() : sortField, modelSnapshots(jobId, from, size, sortField, sortDescending, qb, ActionListener.wrap(handler, errorHandler));
sortDescending, fb.timeRange(
Bucket.TIMESTAMP.getPreferredName(), startEpochMs, endEpochMs).build());
} }
private QueryPage<ModelSnapshot> modelSnapshots(String jobId, int from, int size, private void modelSnapshots(String jobId,
String sortField, boolean sortDescending, QueryBuilder qb) { int from,
int size,
String sortField,
boolean sortDescending,
QueryBuilder qb,
ActionListener<QueryPage<ModelSnapshot>> listener) {
if (Strings.isEmpty(sortField)) {
sortField = ModelSnapshot.RESTORE_PRIORITY.getPreferredName();
}
FieldSortBuilder sb = new FieldSortBuilder(sortField) FieldSortBuilder sb = new FieldSortBuilder(sortField)
.order(sortDescending ? SortOrder.DESC : SortOrder.ASC); .order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
// Wrap in a constant_score because we always want to String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);
// run it as a filter LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
qb = new ConstantScoreQueryBuilder(qb); ModelSnapshot.TYPE, indexName, sortField, from, size);
SearchResponse searchResponse; SearchRequest searchRequest = new SearchRequest(indexName);
try { searchRequest.types(ModelSnapshot.TYPE.getPreferredName());
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}", sourceBuilder.sort(sb);
ModelSnapshot.TYPE, indexName, sortField, from, size); sourceBuilder.query(qb);
sourceBuilder.from(from);
SearchRequest searchRequest = new SearchRequest(indexName); sourceBuilder.size(size);
searchRequest.types(ModelSnapshot.TYPE.getPreferredName()); searchRequest.source(sourceBuilder);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); client.search(searchRequest, ActionListener.wrap(searchResponse -> {
sourceBuilder.sort(sb); List<ModelSnapshot> results = new ArrayList<>();
sourceBuilder.query(qb); for (SearchHit hit : searchResponse.getHits().getHits()) {
sourceBuilder.from(from); BytesReference source = hit.getSourceRef();
sourceBuilder.size(size); try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
searchRequest.source(sourceBuilder); ModelSnapshot modelSnapshot = ModelSnapshot.PARSER.apply(parser, () -> parseFieldMatcher);
searchResponse = FixBlockingClientOperations.executeBlocking(client, SearchAction.INSTANCE, searchRequest); results.add(modelSnapshot);
} catch (IndexNotFoundException e) { } catch (IOException e) {
LOGGER.error("Failed to read modelSnapshots", e); throw new ElasticsearchParseException("failed to parse modelSnapshot", e);
throw e; }
}
List<ModelSnapshot> results = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (XContentParser parser = XContentFactory.xContent(source).createParser(NamedXContentRegistry.EMPTY, source)) {
ModelSnapshot modelSnapshot = ModelSnapshot.PARSER.apply(parser, () -> parseFieldMatcher);
results.add(modelSnapshot);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse modelSnapshot", e);
} }
}
return new QueryPage<>(results, searchResponse.getHits().getTotalHits(), ModelSnapshot.RESULTS_FIELD); QueryPage<ModelSnapshot> result =
new QueryPage<>(results, searchResponse.getHits().getTotalHits(), ModelSnapshot.RESULTS_FIELD);
listener.onResponse(result);
}, listener::onFailure));
} }
/** /**

View File

@ -927,13 +927,8 @@ public class JobProviderTests extends ESTestCase {
int from = 4; int from = 4;
int size = 3; int size = 3;
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class);
SearchResponse response = createSearchResponse(true, source); SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() Client client = getMockedClient(qb -> {}, response);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client); JobProvider provider = createProvider(client);
QueryPage<ModelSnapshot> page = provider.modelSnapshots(jobId, from, size); QueryPage<ModelSnapshot> page = provider.modelSnapshots(jobId, from, size);
@ -956,7 +951,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals(6, snapshots.get(1).getSnapshotDocCount()); assertEquals(6, snapshots.get(1).getSnapshotDocCount());
} }
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testModelSnapshots_WithDescription() public void testModelSnapshots_WithDescription()
throws InterruptedException, ExecutionException, IOException { throws InterruptedException, ExecutionException, IOException {
String jobId = "TestJobIdentificationForInfluencers"; String jobId = "TestJobIdentificationForInfluencers";
@ -984,17 +978,16 @@ public class JobProviderTests extends ESTestCase {
int from = 4; int from = 4;
int size = 3; int size = 3;
ArgumentCaptor<QueryBuilder> queryBuilder = ArgumentCaptor.forClass(QueryBuilder.class); QueryBuilder[] qbHolder = new QueryBuilder[1];
SearchResponse response = createSearchResponse(true, source); SearchResponse response = createSearchResponse(true, source);
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse() Client client = getMockedClient(qb -> qbHolder[0] = qb, response);
.prepareSearch(AnomalyDetectorsIndex.jobResultsIndexName(jobId),
ModelSnapshot.TYPE.getPreferredName(), from, size, response, queryBuilder);
Client client = clientBuilder.build();
JobProvider provider = createProvider(client); JobProvider provider = createProvider(client);
QueryPage<ModelSnapshot> page = provider.modelSnapshots(jobId, from, size, null, null, "sortfield", true, "snappyId", @SuppressWarnings({"unchecked", "rawtypes"})
"description1"); QueryPage<ModelSnapshot>[] hodor = new QueryPage[1];
provider.modelSnapshots(jobId, from, size, null, null, "sortfield", true, "snappyId", "description1",
p -> hodor[0] = p, RuntimeException::new);
QueryPage<ModelSnapshot> page = hodor[0];
assertEquals(2L, page.count()); assertEquals(2L, page.count());
List<ModelSnapshot> snapshots = page.results(); List<ModelSnapshot> snapshots = page.results();
@ -1012,7 +1005,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals(999L, snapshots.get(1).getRestorePriority()); assertEquals(999L, snapshots.get(1).getRestorePriority());
assertEquals(6, snapshots.get(1).getSnapshotDocCount()); assertEquals(6, snapshots.get(1).getSnapshotDocCount());
String queryString = queryBuilder.getValue().toString(); String queryString = qbHolder[0].toString();
assertTrue(queryString.matches("(?s).*snapshot_id.*value. : .snappyId.*description.*value. : .description1.*")); assertTrue(queryString.matches("(?s).*snapshot_id.*value. : .snappyId.*description.*value. : .description1.*"));
} }
@ -1090,7 +1083,6 @@ public class JobProviderTests extends ESTestCase {
assertEquals(0.0, buckets.get(3).getMaxNormalizedProbability(), 0.001); assertEquals(0.0, buckets.get(3).getMaxNormalizedProbability(), 0.001);
} }
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/prelert-legacy/issues/127")
public void testRestoreStateToStream() throws Exception { public void testRestoreStateToStream() throws Exception {
Map<String, Object> categorizerState = new HashMap<>(); Map<String, Object> categorizerState = new HashMap<>();
categorizerState.put("catName", "catVal"); categorizerState.put("catName", "catVal");

View File

@ -5,17 +5,16 @@
*/ */
package org.elasticsearch.xpack.prelert.modelsnapshots; package org.elasticsearch.xpack.prelert.modelsnapshots;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction; import org.elasticsearch.xpack.prelert.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot; import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage; import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
import org.elasticsearch.xpack.prelert.job.results.PageParams; import org.elasticsearch.xpack.prelert.job.results.PageParams;
import java.util.Collections; import java.util.Arrays;
import java.util.Date;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.mockito.Mockito.mock;
public class GetModelSnapshotsTests extends ESTestCase { public class GetModelSnapshotsTests extends ESTestCase {
@ -31,71 +30,16 @@ public class GetModelSnapshotsTests extends ESTestCase {
assertEquals("Parameter [size] cannot be < 0", e.getMessage()); assertEquals("Parameter [size] cannot be < 0", e.getMessage());
} }
public void testModelSnapshots_GivenNoStartOrEndParams() { public void testModelSnapshots_clearQuantiles() {
ModelSnapshot modelSnapshot = new ModelSnapshot(randomAsciiOfLengthBetween(1, 20)); ModelSnapshot m1 = new ModelSnapshot("jobId");
QueryPage<ModelSnapshot> queryResult = new QueryPage<>(Collections.singletonList(modelSnapshot), 300, ModelSnapshot.RESULTS_FIELD); m1.setQuantiles(new Quantiles("jobId", new Date(), "quantileState"));
ModelSnapshot m2 = new ModelSnapshot("jobId");
JobProvider jobProvider = mock(JobProvider.class); QueryPage<ModelSnapshot> page = new QueryPage<>(Arrays.asList(m1, m2), 2, new ParseField("field"));
when(jobProvider.modelSnapshots("foo", 0, 100, null, null, null, true, null, null)).thenReturn(queryResult); GetModelSnapshotsAction.TransportAction.clearQuantiles(page);
assertEquals(2, page.results().size());
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request("foo"); for (ModelSnapshot modelSnapshot : page.results()) {
request.setPageParams(new PageParams(0, 100)); assertNull(modelSnapshot.getQuantiles());
request.setDescOrder(true); }
QueryPage<ModelSnapshot> page = GetModelSnapshotsAction.TransportAction.doGetPage(jobProvider, request);
assertEquals(300, page.count());
}
public void testModelSnapshots_GivenEpochStartAndEpochEndParams() {
ModelSnapshot modelSnapshot = new ModelSnapshot(randomAsciiOfLengthBetween(1, 20));
QueryPage<ModelSnapshot> queryResult = new QueryPage<>(Collections.singletonList(modelSnapshot), 300, ModelSnapshot.RESULTS_FIELD);
JobProvider jobProvider = mock(JobProvider.class);
when(jobProvider.modelSnapshots("foo", 0, 100, "1", "2", null, true, null, null)).thenReturn(queryResult);
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request("foo");
request.setPageParams(new PageParams(0, 100));
request.setStart("1");
request.setEnd("2");
request.setDescOrder(true);
QueryPage<ModelSnapshot> page = GetModelSnapshotsAction.TransportAction.doGetPage(jobProvider, request);
assertEquals(300, page.count());
}
public void testModelSnapshots_GivenIsoWithMillisStartAndEpochEndParams() {
ModelSnapshot modelSnapshot = new ModelSnapshot(randomAsciiOfLengthBetween(1, 20));
QueryPage<ModelSnapshot> queryResult = new QueryPage<>(Collections.singletonList(modelSnapshot), 300, ModelSnapshot.RESULTS_FIELD);
JobProvider jobProvider = mock(JobProvider.class);
when(jobProvider.modelSnapshots("foo", 0, 100, "2015-01-01T12:00:00.042Z", "2015-01-01T13:00:00.142+00:00", null, true, null, null))
.thenReturn(queryResult);
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request("foo");
request.setPageParams(new PageParams(0, 100));
request.setStart("2015-01-01T12:00:00.042Z");
request.setEnd("2015-01-01T13:00:00.142+00:00");
request.setDescOrder(true);
QueryPage<ModelSnapshot> page = GetModelSnapshotsAction.TransportAction.doGetPage(jobProvider, request);
assertEquals(300, page.count());
}
public void testModelSnapshots_GivenIsoWithoutMillisStartAndEpochEndParams() {
ModelSnapshot modelSnapshot = new ModelSnapshot(randomAsciiOfLengthBetween(1, 20));
QueryPage<ModelSnapshot> queryResult = new QueryPage<>(Collections.singletonList(modelSnapshot), 300, ModelSnapshot.RESULTS_FIELD);
JobProvider jobProvider = mock(JobProvider.class);
when(jobProvider.modelSnapshots("foo", 0, 100, "2015-01-01T12:00:00Z", "2015-01-01T13:00:00Z", null, true, null, null))
.thenReturn(queryResult);
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request("foo");
request.setPageParams(new PageParams(0, 100));
request.setStart("2015-01-01T12:00:00Z");
request.setEnd("2015-01-01T13:00:00Z");
request.setDescOrder(true);
QueryPage<ModelSnapshot> page = GetModelSnapshotsAction.TransportAction.doGetPage(jobProvider, request);
assertEquals(300, page.count());
} }
} }