[ML] Add a write alias for persisting job results (elastic/x-pack-elasticsearch#1636)
This commit switches over to two index aliases per job: one for reading and one for writing. In the future this will allow the addition of a rollover endpoint for ML results indices. (Rollover is still not possible following this change, but the change to make it possible in the future should not be a breaking change now.) Relates elastic/x-pack-elasticsearch#1599 relates elastic/x-pack-elasticsearch#827 Original commit: elastic/x-pack-elasticsearch@d648f4631f
This commit is contained in:
parent
ce0315abc4
commit
41ef0b827f
|
@ -25,9 +25,9 @@ public final class AnomalyDetectorsIndex {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The name of the default index where the job's results are stored
|
* The name of the alias pointing to the indices where the job's results are stored
|
||||||
* @param jobId Job Id
|
* @param jobId Job Id
|
||||||
* @return The index name
|
* @return The read alias
|
||||||
*/
|
*/
|
||||||
public static String jobResultsAliasedName(String jobId) {
|
public static String jobResultsAliasedName(String jobId) {
|
||||||
return RESULTS_INDEX_PREFIX + jobId;
|
return RESULTS_INDEX_PREFIX + jobId;
|
||||||
|
@ -39,8 +39,9 @@ public final class AnomalyDetectorsIndex {
|
||||||
* @return The write alias
|
* @return The write alias
|
||||||
*/
|
*/
|
||||||
public static String resultsWriteAlias(String jobId) {
|
public static String resultsWriteAlias(String jobId) {
|
||||||
// TODO: Replace with an actual write alias
|
// ".write" rather than simply "write" to avoid the danger of clashing
|
||||||
return jobResultsAliasedName(jobId);
|
// with the read alias of a job whose name begins with "write-"
|
||||||
|
return RESULTS_INDEX_PREFIX + ".write-" + jobId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -176,17 +176,18 @@ public class JobProvider {
|
||||||
public void createJobResultIndex(Job job, ClusterState state, final ActionListener<Boolean> finalListener) {
|
public void createJobResultIndex(Job job, ClusterState state, final ActionListener<Boolean> finalListener) {
|
||||||
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
|
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
|
||||||
|
|
||||||
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
|
String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
|
||||||
|
String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(job.getId());
|
||||||
String indexName = job.getResultsIndexName();
|
String indexName = job.getResultsIndexName();
|
||||||
|
|
||||||
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
|
final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
|
||||||
client.admin().indices().prepareAliases()
|
client.admin().indices().prepareAliases()
|
||||||
.addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
|
.addAlias(indexName, readAliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
|
||||||
|
.addAlias(indexName, writeAliasName)
|
||||||
// we could return 'success && r.isAcknowledged()' instead of 'true', but that makes
|
// we could return 'success && r.isAcknowledged()' instead of 'true', but that makes
|
||||||
// testing not possible as we can't create IndicesAliasesResponse instance or
|
// testing not possible as we can't create IndicesAliasesResponse instance or
|
||||||
// mock IndicesAliasesResponse#isAcknowledged()
|
// mock IndicesAliasesResponse#isAcknowledged()
|
||||||
.execute(ActionListener.wrap(r -> finalListener.onResponse(true),
|
.execute(ActionListener.wrap(r -> finalListener.onResponse(true), finalListener::onFailure));
|
||||||
finalListener::onFailure));
|
|
||||||
},
|
},
|
||||||
finalListener::onFailure);
|
finalListener::onFailure);
|
||||||
|
|
||||||
|
|
|
@ -50,11 +50,10 @@ public class JobStorageDeletionTask extends Task {
|
||||||
|
|
||||||
final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(state, jobId);
|
final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(state, jobId);
|
||||||
final String indexPattern = indexName + "-*";
|
final String indexPattern = indexName + "-*";
|
||||||
final String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
|
||||||
|
|
||||||
ActionListener<Boolean> deleteAliasHandler = ActionListener.wrap(finishedHandler, failureHandler);
|
ActionListener<Boolean> deleteAliasHandler = ActionListener.wrap(finishedHandler, failureHandler);
|
||||||
|
|
||||||
// Step 5. DBQ state done, delete the alias
|
// Step 5. DBQ state done, delete the aliases
|
||||||
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
|
ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
|
||||||
bulkByScrollResponse -> {
|
bulkByScrollResponse -> {
|
||||||
if (bulkByScrollResponse.isTimedOut()) {
|
if (bulkByScrollResponse.isTimedOut()) {
|
||||||
|
@ -68,7 +67,7 @@ public class JobStorageDeletionTask extends Task {
|
||||||
logger.warn("DBQ failure: " + failure);
|
logger.warn("DBQ failure: " + failure);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
deleteAlias(jobId, aliasName, indexName, client, deleteAliasHandler);
|
deleteAliases(jobId, client, deleteAliasHandler);
|
||||||
},
|
},
|
||||||
failureHandler);
|
failureHandler);
|
||||||
|
|
||||||
|
@ -172,18 +171,27 @@ public class JobStorageDeletionTask extends Task {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deleteAlias(String jobId, String aliasName, String indexName, Client client, ActionListener<Boolean> finishedHandler ) {
|
private void deleteAliases(String jobId, Client client, ActionListener<Boolean> finishedHandler) {
|
||||||
IndicesAliasesRequest request = new IndicesAliasesRequest()
|
final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||||
.addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(aliasName).index(indexName));
|
final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||||
|
final String indexPattern = AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*";
|
||||||
|
|
||||||
|
IndicesAliasesRequest request = new IndicesAliasesRequest().addAliasAction(
|
||||||
|
IndicesAliasesRequest.AliasActions.remove().aliases(readAliasName, writeAliasName).indices(indexPattern));
|
||||||
client.admin().indices().aliases(request, ActionListener.wrap(
|
client.admin().indices().aliases(request, ActionListener.wrap(
|
||||||
response -> finishedHandler.onResponse(true),
|
response -> finishedHandler.onResponse(true),
|
||||||
e -> {
|
e -> {
|
||||||
if (e instanceof AliasesNotFoundException || e instanceof IndexNotFoundException) {
|
if (e instanceof AliasesNotFoundException) {
|
||||||
logger.warn("[{}] Alias [{}] not found. Continuing to delete job.", jobId, aliasName);
|
logger.warn("[{}] Aliases {} not found. Continuing to delete job.", jobId,
|
||||||
|
((AliasesNotFoundException) e).getResourceId());
|
||||||
|
finishedHandler.onResponse(true);
|
||||||
|
} else if (e instanceof IndexNotFoundException) {
|
||||||
|
logger.warn("[{}] Index [{}] referenced by alias not found. Continuing to delete job.", jobId,
|
||||||
|
((IndexNotFoundException) e).getIndex().getName());
|
||||||
finishedHandler.onResponse(true);
|
finishedHandler.onResponse(true);
|
||||||
} else {
|
} else {
|
||||||
// all other exceptions should die
|
// all other exceptions should die
|
||||||
logger.error("[" + jobId + "] Failed to delete alias [" + aliasName + "].", e);
|
logger.error("[" + jobId + "] Failed to delete aliases [" + readAliasName + ", " + writeAliasName + "].", e);
|
||||||
finishedHandler.onFailure(e);
|
finishedHandler.onFailure(e);
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
|
@ -20,7 +20,13 @@ import org.elasticsearch.xpack.XPackPlugin;
|
||||||
import org.elasticsearch.xpack.XPackSettings;
|
import org.elasticsearch.xpack.XPackSettings;
|
||||||
import org.elasticsearch.xpack.XPackSingleNodeTestCase;
|
import org.elasticsearch.xpack.XPackSingleNodeTestCase;
|
||||||
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
|
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
|
||||||
|
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
|
||||||
|
import org.elasticsearch.xpack.ml.action.PutJobAction;
|
||||||
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
import org.elasticsearch.xpack.ml.action.util.QueryPage;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.DataDescription;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.Detector;
|
||||||
|
import org.elasticsearch.xpack.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.ml.job.config.JobTests;
|
import org.elasticsearch.xpack.ml.job.config.JobTests;
|
||||||
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
|
||||||
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
|
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
|
||||||
|
@ -44,6 +50,7 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinitionTests;
|
||||||
import org.elasticsearch.xpack.ml.job.results.Influencer;
|
import org.elasticsearch.xpack.ml.job.results.Influencer;
|
||||||
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
|
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
|
||||||
import org.elasticsearch.xpack.ml.job.results.ModelPlotTests;
|
import org.elasticsearch.xpack.ml.job.results.ModelPlotTests;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -92,7 +99,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void createComponents() {
|
public void createComponents() throws Exception {
|
||||||
renormalizer = new NoOpRenormalizer();
|
renormalizer = new NoOpRenormalizer();
|
||||||
jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
|
jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
|
||||||
Settings.Builder builder = Settings.builder()
|
Settings.Builder builder = Settings.builder()
|
||||||
|
@ -106,11 +113,18 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||||
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
|
capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
putIndexTemplates();
|
||||||
|
putJob();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void deleteJob() throws Exception {
|
||||||
|
DeleteJobAction.Request request = new DeleteJobAction.Request(JOB_ID);
|
||||||
|
DeleteJobAction.Response response = client().execute(DeleteJobAction.INSTANCE, request).actionGet();
|
||||||
|
assertTrue(response.isAcknowledged());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testProcessResults() throws Exception {
|
public void testProcessResults() throws Exception {
|
||||||
putIndexTemplates();
|
|
||||||
|
|
||||||
ResultsBuilder builder = new ResultsBuilder();
|
ResultsBuilder builder = new ResultsBuilder();
|
||||||
Bucket bucket = createBucket(false);
|
Bucket bucket = createBucket(false);
|
||||||
builder.addBucket(bucket);
|
builder.addBucket(bucket);
|
||||||
|
@ -160,7 +174,7 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||||
QueryPage<ModelSnapshot> persistedModelSnapshot = getModelSnapshots();
|
QueryPage<ModelSnapshot> persistedModelSnapshot = getModelSnapshots();
|
||||||
assertEquals(1, persistedModelSnapshot.count());
|
assertEquals(1, persistedModelSnapshot.count());
|
||||||
assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0));
|
assertEquals(modelSnapshot, persistedModelSnapshot.results().get(0));
|
||||||
assertEquals(Arrays.asList(modelSnapshot), capturedUpdateModelSnapshotOnJobRequests);
|
assertEquals(Collections.singletonList(modelSnapshot), capturedUpdateModelSnapshotOnJobRequests);
|
||||||
|
|
||||||
Optional<Quantiles> persistedQuantiles = getQuantiles();
|
Optional<Quantiles> persistedQuantiles = getQuantiles();
|
||||||
assertTrue(persistedQuantiles.isPresent());
|
assertTrue(persistedQuantiles.isPresent());
|
||||||
|
@ -168,7 +182,6 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDeleteInterimResults() throws Exception {
|
public void testDeleteInterimResults() throws Exception {
|
||||||
putIndexTemplates();
|
|
||||||
Bucket nonInterimBucket = createBucket(false);
|
Bucket nonInterimBucket = createBucket(false);
|
||||||
Bucket interimBucket = createBucket(true);
|
Bucket interimBucket = createBucket(true);
|
||||||
|
|
||||||
|
@ -197,7 +210,6 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMultipleFlushesBetweenPersisting() throws Exception {
|
public void testMultipleFlushesBetweenPersisting() throws Exception {
|
||||||
putIndexTemplates();
|
|
||||||
Bucket finalBucket = createBucket(true);
|
Bucket finalBucket = createBucket(true);
|
||||||
List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
|
List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
|
||||||
|
|
||||||
|
@ -227,7 +239,6 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEndOfStreamTriggersPersisting() throws Exception {
|
public void testEndOfStreamTriggersPersisting() throws Exception {
|
||||||
putIndexTemplates();
|
|
||||||
Bucket bucket = createBucket(false);
|
Bucket bucket = createBucket(false);
|
||||||
List<AnomalyRecord> firstSetOfRecords = createRecords(false);
|
List<AnomalyRecord> firstSetOfRecords = createRecords(false);
|
||||||
List<AnomalyRecord> secondSetOfRecords = createRecords(false);
|
List<AnomalyRecord> secondSetOfRecords = createRecords(false);
|
||||||
|
@ -269,6 +280,16 @@ public class AutodetectResultProcessorIT extends XPackSingleNodeTestCase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void putJob() throws Exception {
|
||||||
|
Detector detector = new Detector.Builder("dc", "by_instance").build();
|
||||||
|
Job.Builder jobBuilder = new Job.Builder(JOB_ID);
|
||||||
|
jobBuilder.setDataDescription(new DataDescription.Builder());
|
||||||
|
jobBuilder.setAnalysisConfig(new AnalysisConfig.Builder(Collections.singletonList(detector)));
|
||||||
|
PutJobAction.Request request = new PutJobAction.Request(jobBuilder);
|
||||||
|
PutJobAction.Response response = client().execute(PutJobAction.INSTANCE, request).actionGet();
|
||||||
|
assertTrue(response.isAcknowledged());
|
||||||
|
}
|
||||||
|
|
||||||
private Bucket createBucket(boolean isInterim) {
|
private Bucket createBucket(boolean isInterim) {
|
||||||
Bucket bucket = new BucketTests().createTestInstance(JOB_ID);
|
Bucket bucket = new BucketTests().createTestInstance(JOB_ID);
|
||||||
bucket.setInterim(isInterim);
|
bucket.setInterim(isInterim);
|
||||||
|
|
|
@ -180,10 +180,14 @@ public class MlJobIT extends ESRestTestCase {
|
||||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||||
String responseAsString = responseEntityToString(response);
|
String responseAsString = responseEntityToString(response);
|
||||||
|
|
||||||
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName("custom-" + indexName)
|
assertThat(responseAsString,
|
||||||
+ "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1)
|
containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName("custom-" + indexName) + "\":{\"aliases\":{"));
|
||||||
+ "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId1 + "\",\"boost\":1.0}}}},\"" +
|
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1)
|
||||||
AnomalyDetectorsIndex.jobResultsAliasedName(jobId2)));
|
+ "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId1 + "\",\"boost\":1.0}}}}"));
|
||||||
|
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId1) + "\":{}"));
|
||||||
|
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId2)
|
||||||
|
+ "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId2 + "\",\"boost\":1.0}}}}"));
|
||||||
|
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId2) + "\":{}"));
|
||||||
|
|
||||||
response = client().performRequest("get", "_cat/indices");
|
response = client().performRequest("get", "_cat/indices");
|
||||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||||
|
@ -415,20 +419,24 @@ public class MlJobIT extends ESRestTestCase {
|
||||||
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
|
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDeleteJobAfterMissingAlias() throws Exception {
|
public void testDeleteJobAfterMissingAliases() throws Exception {
|
||||||
String jobId = "delete-job-after-missing-alias-job";
|
String jobId = "delete-job-after-missing-alias-job";
|
||||||
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
|
||||||
|
String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
|
||||||
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
|
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
|
||||||
createFarequoteJob(jobId);
|
createFarequoteJob(jobId);
|
||||||
|
|
||||||
Response response = client().performRequest("get", "_cat/aliases");
|
Response response = client().performRequest("get", "_cat/aliases");
|
||||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||||
String responseAsString = responseEntityToString(response);
|
String responseAsString = responseEntityToString(response);
|
||||||
assertThat(responseAsString, containsString(aliasName));
|
assertThat(responseAsString, containsString(readAliasName));
|
||||||
|
assertThat(responseAsString, containsString(writeAliasName));
|
||||||
|
|
||||||
// Manually delete the alias so that we can test that deletion proceeds
|
// Manually delete the aliases so that we can test that deletion proceeds
|
||||||
// normally anyway
|
// normally anyway
|
||||||
response = client().performRequest("delete", indexName + "/_alias/" + aliasName);
|
response = client().performRequest("delete", indexName + "/_alias/" + readAliasName);
|
||||||
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||||
|
response = client().performRequest("delete", indexName + "/_alias/" + writeAliasName);
|
||||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||||
|
|
||||||
// check alias was deleted
|
// check alias was deleted
|
||||||
|
@ -460,7 +468,7 @@ public class MlJobIT extends ESRestTestCase {
|
||||||
String recordResult =
|
String recordResult =
|
||||||
String.format(Locale.ROOT,
|
String.format(Locale.ROOT,
|
||||||
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"result_type\":\"record\"}",
|
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"result_type\":\"record\"}",
|
||||||
jobId, 123, 1, 1);
|
jobId, 123, 1);
|
||||||
client().performRequest("put", indexName + "/doc/" + 123,
|
client().performRequest("put", indexName + "/doc/" + 123,
|
||||||
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
|
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
|
||||||
client().performRequest("put", indexName + "-001/doc/" + 123,
|
client().performRequest("put", indexName + "-001/doc/" + 123,
|
||||||
|
|
|
@ -55,7 +55,6 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -88,6 +87,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||||
clientBuilder.createIndexRequest(captor);
|
clientBuilder.createIndexRequest(captor);
|
||||||
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter);
|
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter);
|
||||||
|
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.resultsWriteAlias("foo"));
|
||||||
|
|
||||||
Job.Builder job = buildJobBuilder("foo");
|
Job.Builder job = buildJobBuilder("foo");
|
||||||
JobProvider provider = createProvider(clientBuilder.build());
|
JobProvider provider = createProvider(clientBuilder.build());
|
||||||
|
@ -190,13 +190,15 @@ public class JobProviderTests extends ESTestCase {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() {
|
public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() {
|
||||||
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-bar";
|
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-bar";
|
||||||
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
|
String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
|
||||||
|
String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias("foo");
|
||||||
QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo");
|
QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo");
|
||||||
|
|
||||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
|
||||||
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
|
||||||
clientBuilder.createIndexRequest(captor);
|
clientBuilder.createIndexRequest(captor);
|
||||||
clientBuilder.prepareAlias(indexName, aliasName, jobFilter);
|
clientBuilder.prepareAlias(indexName, readAliasName, jobFilter);
|
||||||
|
clientBuilder.prepareAlias(indexName, writeAliasName);
|
||||||
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
|
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
|
||||||
|
|
||||||
Job.Builder job = buildJobBuilder("foo");
|
Job.Builder job = buildJobBuilder("foo");
|
||||||
|
@ -221,6 +223,8 @@ public class JobProviderTests extends ESTestCase {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(Boolean aBoolean) {
|
public void onResponse(Boolean aBoolean) {
|
||||||
verify(client.admin().indices(), times(1)).prepareAliases();
|
verify(client.admin().indices(), times(1)).prepareAliases();
|
||||||
|
verify(client.admin().indices().prepareAliases(), times(1)).addAlias(indexName, readAliasName, jobFilter);
|
||||||
|
verify(client.admin().indices().prepareAliases(), times(1)).addAlias(indexName, writeAliasName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -246,7 +250,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
SearchResponse response = createSearchResponse(source);
|
SearchResponse response = createSearchResponse(source);
|
||||||
int from = 0;
|
int from = 0;
|
||||||
int size = 10;
|
int size = 10;
|
||||||
Client client = getMockedClient(queryBuilder -> {queryBuilderHolder[0] = queryBuilder;}, response);
|
Client client = getMockedClient(queryBuilder -> queryBuilderHolder[0] = queryBuilder, response);
|
||||||
JobProvider provider = createProvider(client);
|
JobProvider provider = createProvider(client);
|
||||||
|
|
||||||
BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(1.0);
|
BucketsQueryBuilder bq = new BucketsQueryBuilder().from(from).size(size).anomalyScoreThreshold(1.0);
|
||||||
|
@ -348,7 +352,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
BucketsQueryBuilder bq = new BucketsQueryBuilder();
|
BucketsQueryBuilder bq = new BucketsQueryBuilder();
|
||||||
bq.timestamp(Long.toString(timestamp));
|
bq.timestamp(Long.toString(timestamp));
|
||||||
Exception[] holder = new Exception[1];
|
Exception[] holder = new Exception[1];
|
||||||
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;}, client);
|
provider.buckets(jobId, bq.build(), q -> {}, e -> holder[0] = e, client);
|
||||||
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +377,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
QueryPage<Bucket>[] bucketHolder = new QueryPage[1];
|
QueryPage<Bucket>[] bucketHolder = new QueryPage[1];
|
||||||
provider.buckets(jobId, bq.build(), q -> {bucketHolder[0] = q;}, e -> {}, client);
|
provider.buckets(jobId, bq.build(), q -> bucketHolder[0] = q, e -> {}, client);
|
||||||
assertThat(bucketHolder[0].count(), equalTo(1L));
|
assertThat(bucketHolder[0].count(), equalTo(1L));
|
||||||
Bucket b = bucketHolder[0].results().get(0);
|
Bucket b = bucketHolder[0].results().get(0);
|
||||||
assertEquals(now, b.getTimestamp());
|
assertEquals(now, b.getTimestamp());
|
||||||
|
@ -400,7 +404,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
bq.timestamp(Long.toString(now.getTime()));
|
bq.timestamp(Long.toString(now.getTime()));
|
||||||
|
|
||||||
Exception[] holder = new Exception[1];
|
Exception[] holder = new Exception[1];
|
||||||
provider.buckets(jobId, bq.build(), q -> {}, e -> {holder[0] = e;}, client);
|
provider.buckets(jobId, bq.build(), q -> {}, e -> holder[0] = e, client);
|
||||||
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
assertEquals(ResourceNotFoundException.class, holder[0].getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -601,7 +605,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
JobProvider provider = createProvider(client);
|
JobProvider provider = createProvider(client);
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
|
QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
|
||||||
provider.categoryDefinitions(jobId, null, from, size, r -> {holder[0] = r;},
|
provider.categoryDefinitions(jobId, null, from, size, r -> holder[0] = r,
|
||||||
e -> {throw new RuntimeException(e);}, client);
|
e -> {throw new RuntimeException(e);}, client);
|
||||||
QueryPage<CategoryDefinition> categoryDefinitions = holder[0];
|
QueryPage<CategoryDefinition> categoryDefinitions = holder[0];
|
||||||
assertEquals(1L, categoryDefinitions.count());
|
assertEquals(1L, categoryDefinitions.count());
|
||||||
|
@ -625,7 +629,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
|
QueryPage<CategoryDefinition>[] holder = new QueryPage[1];
|
||||||
provider.categoryDefinitions(jobId, categoryId, null, null,
|
provider.categoryDefinitions(jobId, categoryId, null, null,
|
||||||
r -> {holder[0] = r;}, e -> {throw new RuntimeException(e);}, client);
|
r -> holder[0] = r, e -> {throw new RuntimeException(e);}, client);
|
||||||
QueryPage<CategoryDefinition> categoryDefinitions = holder[0];
|
QueryPage<CategoryDefinition> categoryDefinitions = holder[0];
|
||||||
assertEquals(1L, categoryDefinitions.count());
|
assertEquals(1L, categoryDefinitions.count());
|
||||||
assertEquals(terms, categoryDefinitions.results().get(0).getTerms());
|
assertEquals(terms, categoryDefinitions.results().get(0).getTerms());
|
||||||
|
@ -895,8 +899,7 @@ public class JobProviderTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Bucket createBucketAtEpochTime(long epoch) {
|
private Bucket createBucketAtEpochTime(long epoch) {
|
||||||
Bucket b = new Bucket("foo", new Date(epoch), 123);
|
return new Bucket("foo", new Date(epoch), 123);
|
||||||
return b;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private JobProvider createProvider(Client client) {
|
private JobProvider createProvider(Client client) {
|
||||||
|
@ -919,8 +922,8 @@ public class JobProviderTests extends ESTestCase {
|
||||||
Map<String, Object> _source = new HashMap<>(map);
|
Map<String, Object> _source = new HashMap<>(map);
|
||||||
|
|
||||||
Map<String, SearchHitField> fields = new HashMap<>();
|
Map<String, SearchHitField> fields = new HashMap<>();
|
||||||
fields.put("field_1", new SearchHitField("field_1", Arrays.asList("foo")));
|
fields.put("field_1", new SearchHitField("field_1", Collections.singletonList("foo")));
|
||||||
fields.put("field_2", new SearchHitField("field_2", Arrays.asList("foo")));
|
fields.put("field_2", new SearchHitField("field_2", Collections.singletonList("foo")));
|
||||||
|
|
||||||
SearchHit hit = new SearchHit(123, String.valueOf(map.hashCode()), new Text("foo"), fields)
|
SearchHit hit = new SearchHit(123, String.valueOf(map.hashCode()), new Text("foo"), fields)
|
||||||
.sourceRef(XContentFactory.jsonBuilder().map(_source).bytes());
|
.sourceRef(XContentFactory.jsonBuilder().map(_source).bytes());
|
||||||
|
|
|
@ -49,7 +49,6 @@ import org.elasticsearch.search.sort.SortBuilder;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
|
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mock;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -70,21 +69,20 @@ import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class MockClientBuilder {
|
public class MockClientBuilder {
|
||||||
@Mock
|
|
||||||
private Client client;
|
private Client client;
|
||||||
|
|
||||||
@Mock
|
|
||||||
private AdminClient adminClient;
|
private AdminClient adminClient;
|
||||||
@Mock
|
|
||||||
private ClusterAdminClient clusterAdminClient;
|
private ClusterAdminClient clusterAdminClient;
|
||||||
@Mock
|
|
||||||
private IndicesAdminClient indicesAdminClient;
|
private IndicesAdminClient indicesAdminClient;
|
||||||
|
|
||||||
|
private IndicesAliasesRequestBuilder aliasesRequestBuilder;
|
||||||
|
|
||||||
public MockClientBuilder(String clusterName) {
|
public MockClientBuilder(String clusterName) {
|
||||||
client = mock(Client.class);
|
client = mock(Client.class);
|
||||||
adminClient = mock(AdminClient.class);
|
adminClient = mock(AdminClient.class);
|
||||||
clusterAdminClient = mock(ClusterAdminClient.class);
|
clusterAdminClient = mock(ClusterAdminClient.class);
|
||||||
indicesAdminClient = mock(IndicesAdminClient.class);
|
indicesAdminClient = mock(IndicesAdminClient.class);
|
||||||
|
aliasesRequestBuilder = mock(IndicesAliasesRequestBuilder.class);
|
||||||
|
|
||||||
when(client.admin()).thenReturn(adminClient);
|
when(client.admin()).thenReturn(adminClient);
|
||||||
when(adminClient.cluster()).thenReturn(clusterAdminClient);
|
when(adminClient.cluster()).thenReturn(clusterAdminClient);
|
||||||
|
@ -282,7 +280,6 @@ public class MockClientBuilder {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public MockClientBuilder prepareAlias(String indexName, String alias, QueryBuilder filter) {
|
public MockClientBuilder prepareAlias(String indexName, String alias, QueryBuilder filter) {
|
||||||
IndicesAliasesRequestBuilder aliasesRequestBuilder = mock(IndicesAliasesRequestBuilder.class);
|
|
||||||
when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias), eq(filter))).thenReturn(aliasesRequestBuilder);
|
when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias), eq(filter))).thenReturn(aliasesRequestBuilder);
|
||||||
when(indicesAdminClient.prepareAliases()).thenReturn(aliasesRequestBuilder);
|
when(indicesAdminClient.prepareAliases()).thenReturn(aliasesRequestBuilder);
|
||||||
doAnswer(new Answer<Void>() {
|
doAnswer(new Answer<Void>() {
|
||||||
|
@ -297,6 +294,22 @@ public class MockClientBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public MockClientBuilder prepareAlias(String indexName, String alias) {
|
||||||
|
when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias))).thenReturn(aliasesRequestBuilder);
|
||||||
|
when(indicesAdminClient.prepareAliases()).thenReturn(aliasesRequestBuilder);
|
||||||
|
doAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
|
ActionListener<IndicesAliasesResponse> listener =
|
||||||
|
(ActionListener<IndicesAliasesResponse>) invocationOnMock.getArguments()[0];
|
||||||
|
listener.onResponse(mock(IndicesAliasesResponse.class));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(aliasesRequestBuilder).execute(any());
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public MockClientBuilder prepareBulk(BulkResponse response) {
|
public MockClientBuilder prepareBulk(BulkResponse response) {
|
||||||
PlainActionFuture<BulkResponse> actionFuture = mock(PlainActionFuture.class);
|
PlainActionFuture<BulkResponse> actionFuture = mock(PlainActionFuture.class);
|
||||||
|
|
|
@ -40,6 +40,16 @@
|
||||||
- match: { count: 1 }
|
- match: { count: 1 }
|
||||||
- match: { jobs.0.job_id: "job-crud-test-apis" }
|
- match: { jobs.0.job_id: "job-crud-test-apis" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.get_alias:
|
||||||
|
name: ".ml-anomalies-job-crud-test-apis"
|
||||||
|
- match: { \.ml-anomalies-shared.aliases.\.ml-anomalies-job-crud-test-apis.filter.term.job_id.value: job-crud-test-apis }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.get_alias:
|
||||||
|
name: ".ml-anomalies-.write-job-crud-test-apis"
|
||||||
|
- match: { \.ml-anomalies-shared.aliases.\.ml-anomalies-\.write-job-crud-test-apis: {} }
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
xpack.ml.delete_job:
|
xpack.ml.delete_job:
|
||||||
job_id: "job-crud-test-apis"
|
job_id: "job-crud-test-apis"
|
||||||
|
@ -50,6 +60,16 @@
|
||||||
index: ".ml-anomalies-job-crud-test-apis"
|
index: ".ml-anomalies-job-crud-test-apis"
|
||||||
- is_false: ''
|
- is_false: ''
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.exists_alias:
|
||||||
|
name: ".ml-anomalies-job-crud-test-apis"
|
||||||
|
- is_false: ''
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.exists_alias:
|
||||||
|
name: ".ml-anomalies-.write-job-crud-test-apis"
|
||||||
|
- is_false: ''
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test get job API with non existing job id":
|
"Test get job API with non existing job id":
|
||||||
- do:
|
- do:
|
||||||
|
|
Loading…
Reference in New Issue