From 02da8e7cd962b1fc7bcc7f14cdb6d0e88b2c3305 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 16 Jun 2017 15:18:16 +0100 Subject: [PATCH] [ML] Use bulk request to persist model plots (elastic/x-pack-elasticsearch#1714) * Use bulk request to persist model plots and model size stats * Revert persisting model size stats in the bulk request * Refactor results persister Original commit: elastic/x-pack-elasticsearch@f51297bfc246d3bdc4bfbdb830cfcd37b619a44f --- .../JobRenormalizedResultsPersister.java | 5 +- .../job/persistence/JobResultsPersister.java | 79 ++++++++----------- .../output/AutoDetectResultProcessor.java | 2 +- .../JobRenormalizedResultsPersisterTests.java | 21 +++++ .../persistence/JobResultsPersisterTests.java | 20 +++++ .../AutoDetectResultProcessorTests.java | 10 +-- 6 files changed, 82 insertions(+), 55 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java index dcd62706db4..b23b5ec65be 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -39,7 +40,7 @@ public class JobRenormalizedResultsPersister extends AbstractComponent { /** * Execute bulk requests when they reach this size */ - private static final int BULK_LIMIT = 10000; + static final int BULK_LIMIT = 10000; private final String jobId; private final Client client; @@ -75,7 +76,7 @@ public class JobRenormalizedResultsPersister extends AbstractComponent { try (XContentBuilder content = toXContentBuilder(resultDoc)) { bulkRequest.add(new IndexRequest(index, DOC_TYPE, id).source(content)); } catch (IOException e) { - logger.error("Error serialising result", e); + logger.error(new ParameterizedMessage("[{}] Error serialising result", jobId), e); } if (bulkRequest.numberOfActions() >= BULK_LIMIT) { executeRequest(); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index c107038e7c3..45cc6cb5b8a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -94,29 +94,21 @@ public class JobResultsPersister extends AbstractComponent { bucketWithoutRecords = new Bucket(bucket); bucketWithoutRecords.setRecords(Collections.emptyList()); } - try (XContentBuilder content = toXContentBuilder(bucketWithoutRecords)) { - String id = bucketWithoutRecords.getId(); - logger.trace("[{}] ES API CALL: index bucket to index [{}] with ID [{}]", jobId, indexName, id); + String id = bucketWithoutRecords.getId(); + logger.trace("[{}] ES API CALL: index bucket to index [{}] with ID [{}]", jobId, indexName, id); + indexResult(id, bucketWithoutRecords, "bucket"); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); - - persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers()); - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising bucket", jobId), e); - } + persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers()); return this; } - private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) throws IOException { + private void persistBucketInfluencersStandalone(String jobId, List bucketInfluencers) { if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) { for (BucketInfluencer bucketInfluencer : bucketInfluencers) { - try (XContentBuilder content = toXContentBuilder(bucketInfluencer)) { - // Need consistent IDs to ensure overwriting on renormalization - String id = bucketInfluencer.getId(); - logger.trace("[{}] ES BULK ACTION: index bucket influencer to index [{}] with ID [{}]", jobId, indexName, id); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); - } + String id = bucketInfluencer.getId(); + logger.trace("[{}] ES BULK ACTION: index bucket influencer to index [{}] with ID [{}]", jobId, indexName, id); + indexResult(id, bucketInfluencer, "bucket influencer"); } } } @@ -128,17 +120,9 @@ public class JobResultsPersister extends AbstractComponent { * @return this */ public Builder persistRecords(List records) { - - try { - for (AnomalyRecord record : records) { - try (XContentBuilder content = toXContentBuilder(record)) { - String id = record.getId(); - logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, id); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); - } - } - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e); + for (AnomalyRecord record : records) { + logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, record.getId()); + indexResult(record.getId(), record, "record"); } return this; @@ -152,21 +136,32 @@ public class JobResultsPersister extends AbstractComponent { * @return this */ public Builder persistInfluencers(List influencers) { - try { - for (Influencer influencer : influencers) { - try (XContentBuilder content = toXContentBuilder(influencer)) { - String id = influencer.getId(); - logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, id); - bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); - } - } - } catch (IOException e) { - logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e); + for (Influencer influencer : influencers) { + logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, influencer.getId()); + indexResult(influencer.getId(), influencer, "influencer"); } return this; } + public Builder persistModelPlot(ModelPlot modelPlot) { + logger.trace("[{}] ES BULK ACTION: index model plot to index [{}] with ID [{}]", jobId, indexName, modelPlot.getId()); + indexResult(modelPlot.getId(), modelPlot, "model plot"); + return this; + } + + private void indexResult(String id, ToXContent resultDoc, String resultType) { + try (XContentBuilder content = toXContentBuilder(resultDoc)) { + bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content)); + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Error serialising {}", jobId, resultType), e); + } + + if (bulkRequest.numberOfActions() >= JobRenormalizedResultsPersister.BULK_LIMIT) { + executeRequest(); + } + } + /** * Execute the bulk action */ @@ -254,16 +249,6 @@ public class JobResultsPersister extends AbstractComponent { // for information at the API level } - /** - * Persist model plot output - */ - public void persistModelPlot(ModelPlot modelPlot) { - Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, modelPlot.getId()); - persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelPlot.getJobId())).actionGet(); - // Don't commit as we expect masses of these updates and they're not - // read again by this process - } - /** * Delete any existing interim results synchronously */ diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 99d389977af..384a3eee677 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -191,7 +191,7 @@ public class AutoDetectResultProcessor { } ModelPlot modelPlot = result.getModelPlot(); if (modelPlot != null) { - persister.persistModelPlot(modelPlot); + context.bulkResultsPersister.persistModelPlot(modelPlot); } ModelSizeStats modelSizeStats = result.getModelSizeStats(); if (modelSizeStats != null) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java index ce84f4eca7b..eb2ef0c7a43 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; @@ -12,10 +13,16 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.process.normalizer.BucketNormalizable; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; +import org.elasticsearch.xpack.ml.job.results.ModelPlot; +import org.mockito.ArgumentCaptor; import java.util.Date; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class JobRenormalizedResultsPersisterTests extends ESTestCase { @@ -37,6 +44,20 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { assertEquals(0, persister.getBulkRequest().numberOfActions()); } + public void testBulkRequestExecutesWhenReachMaxDocs() { + BulkResponse bulkResponse = mock(BulkResponse.class); + Client client = new MockClientBuilder("cluster").bulk(bulkResponse).build(); + JobRenormalizedResultsPersister persister = new JobRenormalizedResultsPersister("foo", Settings.EMPTY, client); + + ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456); + for (int i=0; i<=JobRenormalizedResultsPersister.BULK_LIMIT; i++) { + persister.updateResult("bar", "index-foo", modelPlot); + } + + verify(client, times(1)).bulk(any()); + verifyNoMoreInteractions(client); + } + private JobRenormalizedResultsPersister createJobRenormalizedResultsPersister() { BulkResponse bulkResponse = mock(BulkResponse.class); when(bulkResponse.hasFailures()).thenReturn(false); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index ca6562ac9c1..f5870de0ce2 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import org.elasticsearch.xpack.ml.job.results.Influencer; +import org.elasticsearch.xpack.ml.job.results.ModelPlot; import org.mockito.ArgumentCaptor; import java.io.IOException; @@ -25,7 +26,11 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -172,6 +177,21 @@ public class JobResultsPersisterTests extends ESTestCase { assertEquals(0, builder.getBulkRequest().numberOfActions()); } + public void testBulkRequestExecutesWhenReachMaxDocs() { + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + Client client = mockClient(captor); + JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); + + JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo"); + ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456); + for (int i=0; i<=JobRenormalizedResultsPersister.BULK_LIMIT; i++) { + bulkBuilder.persistModelPlot(modelPlot); + } + + verify(client, times(1)).bulk(any()); + verifyNoMoreInteractions(client); + } + @SuppressWarnings({"unchecked", "rawtypes"}) private Client mockClient(ArgumentCaptor captor) { Client client = mock(Client.class); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index cc54eb71908..7f979095428 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -240,7 +240,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getModelPlot()).thenReturn(modelPlot); processorUnderTest.processResult(context, result); - verify(persister, times(1)).persistModelPlot(modelPlot); + verify(bulkBuilder, times(1)).persistModelPlot(modelPlot); verifyNoMoreInteractions(persister); } @@ -314,8 +314,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase { public void testPersisterThrowingDoesntBlockProcessing() { AutodetectResult autodetectResult = mock(AutodetectResult.class); - ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); - when(autodetectResult.getModelSizeStats()).thenReturn(modelSizeStats); + ModelSnapshot modelSnapshot = mock(ModelSnapshot.class); + when(autodetectResult.getModelSnapshot()).thenReturn(modelSnapshot); @SuppressWarnings("unchecked") Iterator iterator = mock(Iterator.class); @@ -324,10 +324,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase { AutodetectProcess process = mock(AutodetectProcess.class); when(process.readAutodetectResults()).thenReturn(iterator); - doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSizeStats(any()); + doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any()); processorUnderTest.process(process); - verify(persister, times(2)).persistModelSizeStats(any()); + verify(persister, times(2)).persistModelSnapshot(any()); } public void testParsingErrorSetsFailed() {