[ML] Separate read from write index for results (elastic/x-pack-elasticsearch#1397)

This is in preparation of introducing a write alias.
It adjusts all requests to persist results to do so
using a method that returns the write alias (even though
it currently returns the same as the read alias).

Relates elastic/x-pack-elasticsearch#827

Original commit: elastic/x-pack-elasticsearch@1358dd8dcf
This commit is contained in:
Dimitris Athanasiou 2017-05-11 14:44:40 +01:00 committed by GitHub
parent 4d3bc71327
commit ba40994b1f
4 changed files with 21 additions and 8 deletions

View File

@ -414,7 +414,7 @@ public class JobManager extends AbstractComponent {
* @param modelSnapshot the updated model snapshot object to be stored
*/
public void updateModelSnapshot(ModelSnapshot modelSnapshot, Consumer<Boolean> handler, Consumer<Exception> errorHandler) {
String index = AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId());
String index = AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId());
IndexRequest indexRequest = new IndexRequest(index, ModelSnapshot.TYPE.getPreferredName(), ModelSnapshot.documentId(modelSnapshot));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
modelSnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS);

View File

@ -38,6 +38,16 @@ public final class AnomalyDetectorsIndex {
return RESULTS_INDEX_PREFIX + jobId;
}
/**
* The name of the alias pointing to the write index for a job
* @param jobId Job Id
* @return The write alias
*/
public static String resultsWriteAlias(String jobId) {
// TODO: Replace with an actual write alias
return jobResultsAliasedName(jobId);
}
/**
* Retrieves the currently defined physical index from the job state
* @param jobId Job Id

View File

@ -47,7 +47,7 @@ public class JobDataCountsPersister extends AbstractComponent {
*/
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try (XContentBuilder content = serialiseCounts(counts)) {
client.prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), DataCounts.TYPE.getPreferredName(),
client.prepareIndex(AnomalyDetectorsIndex.resultsWriteAlias(jobId), DataCounts.TYPE.getPreferredName(),
DataCounts.documentId(jobId))
.setSource(content).execute(new ActionListener<IndexResponse>() {
@Override

View File

@ -77,7 +77,7 @@ public class JobResultsPersister extends AbstractComponent {
private Builder(String jobId) {
this.jobId = Objects.requireNonNull(jobId);
indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
bulkRequest = new BulkRequest();
}
@ -225,7 +225,7 @@ public class JobResultsPersister extends AbstractComponent {
public void persistCategoryDefinition(CategoryDefinition category) {
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(),
CategoryDefinition.documentId(category.getJobId(), Long.toString(category.getCategoryId())));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(category.getJobId())).actionGet();
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId())).actionGet();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@ -255,7 +255,7 @@ public class JobResultsPersister extends AbstractComponent {
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
ModelSnapshot.documentId(modelSnapshot));
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId())).actionGet();
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet();
}
/**
@ -265,7 +265,7 @@ public class JobResultsPersister extends AbstractComponent {
String jobId = modelSizeStats.getJobId();
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId());
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).actionGet();
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet();
// Don't commit as we expect masses of these updates and they're only
// for information at the API level
}
@ -279,7 +279,7 @@ public class JobResultsPersister extends AbstractComponent {
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(jobId, modelSizeStats, Result.TYPE.getPreferredName(), modelSizeStats.documentId());
persistable.setRefreshPolicy(refreshPolicy);
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), listener);
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId), listener);
// Don't commit as we expect masses of these updates and they're only
// for information at the API level
}
@ -289,7 +289,7 @@ public class JobResultsPersister extends AbstractComponent {
*/
public void persistModelPlot(ModelPlot modelPlot) {
Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, Result.TYPE.getPreferredName(), null);
persistable.persist(AnomalyDetectorsIndex.jobResultsAliasedName(modelPlot.getJobId())).actionGet();
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelPlot.getJobId())).actionGet();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
@ -309,7 +309,10 @@ public class JobResultsPersister extends AbstractComponent {
* @return True if successful
*/
public boolean commitResultWrites(String jobId) {
// We refresh using the read alias in order to ensure all indices will
// be refreshed even if a rollover occurs in between.
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
RefreshRequest refreshRequest = new RefreshRequest(indexName);