diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 1909ed08ae5..5aebca39707 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -414,7 +414,7 @@ public class JobManager extends AbstractComponent { * @param modelSnapshot the updated model snapshot object to be stored */ public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer handler, Consumer errorHandler) { - String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()); + String index = AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId()); IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { modelSnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java index 4eb1d0aef97..641760462c1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/AnomalyDetectorsIndex.java @@ -38,6 +38,16 @@ public final class AnomalyDetectorsIndex { return RESULTS_INDEX_PREFIX + jobId; } + /** + * The name of the alias pointing to the write index for a job + * @param jobId Job Id + * @return The write alias + */ + public static String resultsWriteAlias(String jobId) { + // TODO: Replace with an actual write alias + return jobResultsAliasedName(jobId); + } + /** * Retrieves the currently defined physical index from the job state * @param jobId Job Id diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java index 64f9747b684..22e1b5a399c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataCountsPersister.java @@ -47,7 +47,7 @@ public class JobDataCountsPersister extends AbstractComponent { */ public void persistDataCounts(String jobId, DataCounts counts, ActionListener listener) { try (XContentBuilder content = serialiseCounts(counts)) { - client.prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), DataCounts.TYPE.getPreferredName(), + client.prepareIndex(AnomalyDetectorsIndex.resultsWriteAlias(jobId), DataCounts.TYPE.getPreferredName(), DataCounts.documentId(jobId)) .setSource(content).execute(new ActionListener() { @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index c8141290bcd..2babba53ec3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -77,7 +77,7 @@ public class JobResultsPersister extends AbstractComponent { private Builder(String jobId) { this.jobId = Objects.requireNonNull(jobId); - indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); bulkRequest = new BulkRequest(); } @@ -225,7 +225,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistCategoryDefinition(CategoryDefinition category) { Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(), CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId()))); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId())).actionGet(); + persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -255,7 +255,7 @@ public class JobResultsPersister extends AbstractComponent { public void persistModelSnapshot(ModelSnapshot modelSnapshot) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot)); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())).actionGet(); + persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); } /** @@ -265,7 +265,7 @@ public class JobResultsPersister extends AbstractComponent { String jobId = modelSizeStats.getJobId(); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId()); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).actionGet(); + persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet(); // Don't commit as we expect masses of these updates and they're only // for information at the API level } @@ -279,7 +279,7 @@ public class JobResultsPersister extends AbstractComponent { logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId()); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), listener); + persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), listener); // Don't commit as we expect masses of these updates and they're only // for information at the API level } @@ -289,7 +289,7 @@ public class JobResultsPersister extends AbstractComponent { */ public void persistModelPlot(ModelPlot modelPlot) { Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, Result.TYPE.getPreferredName(), null); - persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelPlot.getJobId())).actionGet(); + persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelPlot.getJobId())).actionGet(); // Don't commit as we expect masses of these updates and they're not // read again by this process } @@ -309,7 +309,10 @@ public class JobResultsPersister extends AbstractComponent { * @return True if successful */ public boolean commitResultWrites(String jobId) { + // We refresh using the read alias in order to ensure all indices will + // be refreshed even if a rollover occurs in between. String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); RefreshRequest refreshRequest = new RefreshRequest(indexName);