Move JobResultsPersister.getJobIndexName to new class AnomalyDetectorsIndex (elastic/elasticsearch#567)
Original commit: elastic/x-pack-elasticsearch@bf54dd56a9
This commit is contained in:
parent
e3ec908828
commit
aacb6f3949
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.persistence;
|
||||
|
||||
/**
|
||||
* Methods for handling index naming related functions
|
||||
*/
|
||||
public final class AnomalyDetectorsIndex {
|
||||
private static final String INDEX_PREFIX = "prelertresults-";
|
||||
|
||||
private AnomalyDetectorsIndex() {
|
||||
}
|
||||
|
||||
public static String getJobIndexName(String jobId) {
|
||||
return INDEX_PREFIX + jobId;
|
||||
}
|
||||
|
||||
}
|
|
@ -19,7 +19,7 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
|
|||
|
||||
class ElasticsearchBatchedModelSnapshotIterator extends ElasticsearchBatchedDocumentsIterator<ModelSnapshot> {
|
||||
public ElasticsearchBatchedModelSnapshotIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher) {
|
||||
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
|
||||
super(client, AnomalyDetectorsIndex.getJobIndexName(jobId), parserFieldMatcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -13,7 +13,7 @@ import org.elasticsearch.xpack.prelert.job.results.Result;
|
|||
abstract class ElasticsearchBatchedResultsIterator<T> extends ElasticsearchBatchedDocumentsIterator<T> {
|
||||
|
||||
public ElasticsearchBatchedResultsIterator(Client client, String jobId, String resultType, ParseFieldMatcher parseFieldMatcher) {
|
||||
super(client, JobResultsPersister.getJobIndexName(jobId), parseFieldMatcher,
|
||||
super(client, AnomalyDetectorsIndex.getJobIndexName(jobId), parseFieldMatcher,
|
||||
new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), resultType));
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ import org.elasticsearch.xpack.prelert.job.DataCounts;
|
|||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister.getJobIndexName;
|
||||
|
||||
/**
|
||||
* Update a job's dataCounts
|
||||
|
@ -49,7 +48,8 @@ public class JobDataCountsPersister extends AbstractComponent {
|
|||
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
|
||||
try {
|
||||
XContentBuilder content = serialiseCounts(counts);
|
||||
client.prepareIndex(getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX)
|
||||
client.prepareIndex(AnomalyDetectorsIndex.getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(),
|
||||
jobId + DataCounts.DOCUMENT_SUFFIX)
|
||||
.setSource(content).execute(new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse indexResponse) {
|
||||
|
|
|
@ -67,7 +67,7 @@ public class JobDataDeleter {
|
|||
* @param listener Response listener
|
||||
*/
|
||||
public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
|
||||
String index = JobResultsPersister.getJobIndexName(jobId);
|
||||
String index = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
|
||||
RangeQueryBuilder timeRange = QueryBuilders.rangeQuery(Bucket.TIMESTAMP.getPreferredName());
|
||||
timeRange.gte(cutoffEpochMs);
|
||||
|
@ -108,7 +108,7 @@ public class JobDataDeleter {
|
|||
public void deleteModelSnapshot(ModelSnapshot modelSnapshot) {
|
||||
String snapshotId = modelSnapshot.getSnapshotId();
|
||||
int docCount = modelSnapshot.getSnapshotDocCount();
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
// Deduce the document IDs of the state documents from the information
|
||||
// in the snapshot document - we cannot query the state itself as it's
|
||||
// too big and has no mappings
|
||||
|
@ -126,7 +126,7 @@ public class JobDataDeleter {
|
|||
* Delete all results marked as interim
|
||||
*/
|
||||
public void deleteInterimResults() {
|
||||
String index = JobResultsPersister.getJobIndexName(jobId);
|
||||
String index = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
|
||||
QueryBuilder qb = QueryBuilders.termQuery(Bucket.IS_INTERIM.getPreferredName(), true);
|
||||
|
||||
|
|
|
@ -193,7 +193,7 @@ public class JobProvider {
|
|||
|
||||
String jobId = job.getId();
|
||||
LOGGER.trace("ES API CALL: create index {}", job.getId());
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(JobResultsPersister.getJobIndexName(jobId));
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(AnomalyDetectorsIndex.getJobIndexName(jobId));
|
||||
Settings.Builder settingsBuilder = prelertIndexSettings();
|
||||
createIndexRequest.settings(settingsBuilder);
|
||||
createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
|
||||
|
@ -230,7 +230,7 @@ public class JobProvider {
|
|||
*/
|
||||
// TODO: should live together with createJobRelatedIndices (in case it moves)?
|
||||
public void deleteJobRelatedIndices(String jobId, ActionListener<DeleteJobAction.Response> listener) {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: delete index {}", indexName);
|
||||
|
||||
try {
|
||||
|
@ -257,7 +257,7 @@ public class JobProvider {
|
|||
* @return The dataCounts or default constructed object if not found
|
||||
*/
|
||||
public DataCounts dataCounts(String jobId) {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
|
||||
try {
|
||||
GetResponse response = client.prepareGet(indexName, DataCounts.TYPE.getPreferredName(),
|
||||
|
@ -349,7 +349,7 @@ public class JobProvider {
|
|||
|
||||
SearchResponse searchResponse;
|
||||
try {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search all of result type {} from index {} with filter from {} size {}",
|
||||
Bucket.RESULT_TYPE_VALUE, indexName, from, size);
|
||||
|
||||
|
@ -392,7 +392,7 @@ public class JobProvider {
|
|||
* @throws ResourceNotFoundException If the job id is not recognised
|
||||
*/
|
||||
public QueryPage<Bucket> bucket(String jobId, BucketQueryBuilder.BucketQuery query) throws ResourceNotFoundException {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
SearchHits hits;
|
||||
try {
|
||||
LOGGER.trace("ES API CALL: get Bucket with timestamp {} from index {}", query.getTimestamp(), indexName);
|
||||
|
@ -475,7 +475,7 @@ public class JobProvider {
|
|||
.filter(new TermsQueryBuilder(AnomalyRecord.PARTITION_FIELD_VALUE.getPreferredName(), partitionFieldValue));
|
||||
|
||||
FieldSortBuilder sb = new FieldSortBuilder(Bucket.TIMESTAMP.getPreferredName()).order(SortOrder.ASC);
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
SearchRequestBuilder searchBuilder = client
|
||||
.prepareSearch(indexName)
|
||||
.setQuery(boolQuery)
|
||||
|
@ -601,7 +601,7 @@ public class JobProvider {
|
|||
* @return QueryPage of CategoryDefinition
|
||||
*/
|
||||
public QueryPage<CategoryDefinition> categoryDefinitions(String jobId, int from, int size) {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} from {} size {}",
|
||||
CategoryDefinition.TYPE.getPreferredName(), indexName, CategoryDefinition.CATEGORY_ID.getPreferredName(), from, size);
|
||||
|
||||
|
@ -641,7 +641,7 @@ public class JobProvider {
|
|||
* @return QueryPage CategoryDefinition
|
||||
*/
|
||||
public QueryPage<CategoryDefinition> categoryDefinition(String jobId, String categoryId) {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
GetResponse response;
|
||||
|
||||
try {
|
||||
|
@ -708,7 +708,7 @@ public class JobProvider {
|
|||
private QueryPage<AnomalyRecord> records(String jobId, int from, int size,
|
||||
QueryBuilder recordFilter, FieldSortBuilder sb, List<String> secondarySort,
|
||||
boolean descending) throws ResourceNotFoundException {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
|
||||
recordFilter = new BoolQueryBuilder()
|
||||
.filter(recordFilter)
|
||||
|
@ -775,7 +775,7 @@ public class JobProvider {
|
|||
|
||||
private QueryPage<Influencer> influencers(String jobId, int from, int size, QueryBuilder filterBuilder, String sortField,
|
||||
boolean sortDescending) throws ResourceNotFoundException {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search all of result type {} from index {}{} with filter from {} size {}",
|
||||
() -> Influencer.RESULT_TYPE_VALUE, () -> indexName,
|
||||
() -> (sortField != null) ?
|
||||
|
@ -855,7 +855,7 @@ public class JobProvider {
|
|||
* Get the persisted quantiles state for the job
|
||||
*/
|
||||
public Optional<Quantiles> getQuantiles(String jobId) {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
try {
|
||||
LOGGER.trace("ES API CALL: get ID " + Quantiles.QUANTILES_ID +
|
||||
" type " + Quantiles.TYPE + " from index " + indexName);
|
||||
|
@ -935,7 +935,7 @@ public class JobProvider {
|
|||
|
||||
SearchResponse searchResponse;
|
||||
try {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search all of type {} from index {} sort ascending {} with filter after sort from {} size {}",
|
||||
ModelSnapshot.TYPE, indexName, sortField, from, size);
|
||||
|
||||
|
@ -976,7 +976,7 @@ public class JobProvider {
|
|||
* @param restoreStream the stream to write the state to
|
||||
*/
|
||||
public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
|
||||
// First try to restore categorizer state. There are no snapshots for this, so the IDs simply
|
||||
// count up until a document is not found. It's NOT an error to have no categorizer state.
|
||||
|
@ -1051,7 +1051,7 @@ public class JobProvider {
|
|||
|
||||
SearchResponse searchResponse;
|
||||
try {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
LOGGER.trace("ES API CALL: search result type {} from index {} from {}, size {}",
|
||||
ModelDebugOutput.RESULT_TYPE_VALUE, indexName, from, size);
|
||||
|
||||
|
@ -1085,7 +1085,7 @@ public class JobProvider {
|
|||
* Get the job's model size stats.
|
||||
*/
|
||||
public Optional<ModelSizeStats> modelSizeStats(String jobId) {
|
||||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
try {
|
||||
LOGGER.trace("ES API CALL: get result type {} ID {} from index {}",
|
||||
ModelSizeStats.RESULT_TYPE_VALUE, ModelSizeStats.RESULT_TYPE_FIELD, indexName);
|
||||
|
@ -1143,6 +1143,6 @@ public class JobProvider {
|
|||
* @return the {@code Auditor}
|
||||
*/
|
||||
public Auditor audit(String jobId) {
|
||||
return new Auditor(client, JobResultsPersister.getJobIndexName(jobId), jobId);
|
||||
return new Auditor(client, AnomalyDetectorsIndex.getJobIndexName(jobId), jobId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
|
||||
private Builder (String jobId) {
|
||||
this.jobId = Objects.requireNonNull(jobId);
|
||||
indexName = getJobIndexName(jobId);
|
||||
indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
bulkRequest = client.prepareBulk();
|
||||
}
|
||||
|
||||
|
@ -288,7 +288,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
* @return True if successful
|
||||
*/
|
||||
public boolean commitWrites(String jobId) {
|
||||
String indexName = getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
// Refresh should wait for Lucene to make the data searchable
|
||||
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
|
||||
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
|
||||
|
@ -308,12 +308,6 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
return builder;
|
||||
}
|
||||
|
||||
private static final String INDEX_PREFIX = "prelertresults-";
|
||||
|
||||
public static String getJobIndexName(String jobId) {
|
||||
return INDEX_PREFIX + jobId;
|
||||
}
|
||||
|
||||
private class Persistable {
|
||||
|
||||
private final String jobId;
|
||||
|
@ -337,7 +331,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
logCall();
|
||||
|
||||
try {
|
||||
String indexName = getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
client.prepareIndex(indexName, type, id)
|
||||
.setSource(toXContentBuilder(object))
|
||||
.execute().actionGet();
|
||||
|
@ -349,7 +343,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
}
|
||||
|
||||
private void logCall() {
|
||||
String indexName = getJobIndexName(jobId);
|
||||
String indexName = AnomalyDetectorsIndex.getJobIndexName(jobId);
|
||||
if (id != null) {
|
||||
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, type, indexName, id);
|
||||
} else {
|
||||
|
|
|
@ -48,7 +48,7 @@ public class UsagePersister extends AbstractComponent {
|
|||
|
||||
// update global count
|
||||
updateDocument(jobId, PRELERT_USAGE_INDEX, docId, bytesRead, fieldsRead, recordsRead);
|
||||
updateDocument(jobId, JobResultsPersister.getJobIndexName(jobId), docId, bytesRead,
|
||||
updateDocument(jobId, AnomalyDetectorsIndex.getJobIndexName(jobId), docId, bytesRead,
|
||||
fieldsRead, recordsRead);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.xpack.prelert.job.DataCounts;
|
|||
import org.elasticsearch.xpack.prelert.job.DataDescription;
|
||||
import org.elasticsearch.xpack.prelert.job.Detector;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.prelert.scheduler.SchedulerConfig;
|
||||
import org.elasticsearch.xpack.prelert.scheduler.SchedulerStatus;
|
||||
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
|
||||
|
@ -191,7 +192,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
private DataCounts getDataCounts(String jobId) {
|
||||
GetResponse getResponse = client().prepareGet(JobResultsPersister.getJobIndexName(jobId),
|
||||
GetResponse getResponse = client().prepareGet(AnomalyDetectorsIndex.getJobIndexName(jobId),
|
||||
DataCounts.TYPE.getPreferredName(), jobId + "-data-counts").get();
|
||||
if (getResponse.isExists() == false) {
|
||||
return new DataCounts(jobId);
|
||||
|
|
|
@ -35,7 +35,7 @@ public class JobDataDeleterTests extends ESTestCase {
|
|||
BulkResponse bulkResponse = Mockito.mock(BulkResponse.class);
|
||||
|
||||
Client client = new MockClientBuilder("myCluster")
|
||||
.prepareSearchExecuteListener(JobResultsPersister.getJobIndexName("foo"), response)
|
||||
.prepareSearchExecuteListener(AnomalyDetectorsIndex.getJobIndexName("foo"), response)
|
||||
.prepareSearchScrollExecuteListener(response)
|
||||
.prepareBulk(bulkResponse).build();
|
||||
|
||||
|
|
|
@ -159,7 +159,7 @@ public class JobProviderTests extends ESTestCase {
|
|||
public void testCreateJobRelatedIndicies() {
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||
clientBuilder.createIndexRequest(JobResultsPersister.getJobIndexName("foo"), captor);
|
||||
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.getJobIndexName("foo"), captor);
|
||||
|
||||
Job.Builder job = buildJobBuilder("foo");
|
||||
JobProvider provider = createProvider(clientBuilder.build());
|
||||
|
|
Loading…
Reference in New Issue