[ML] Use bulk request to persist model plots (elastic/x-pack-elasticsearch#1714)

* Use bulk request to persist model plots and model size stats

* Revert persisting model size stats in the bulk request

* Refactor results persister

Original commit: elastic/x-pack-elasticsearch@f51297bfc2
This commit is contained in:
David Kyle 2017-06-16 15:18:16 +01:00 committed by GitHub
parent c63d4e306b
commit 02da8e7cd9
6 changed files with 82 additions and 55 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -39,7 +40,7 @@ public class JobRenormalizedResultsPersister extends AbstractComponent {
/**
* Execute bulk requests when they reach this size
*/
private static final int BULK_LIMIT = 10000;
static final int BULK_LIMIT = 10000;
private final String jobId;
private final Client client;
@ -75,7 +76,7 @@ public class JobRenormalizedResultsPersister extends AbstractComponent {
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
bulkRequest.add(new IndexRequest(index, DOC_TYPE, id).source(content));
} catch (IOException e) {
logger.error("Error serialising result", e);
logger.error(new ParameterizedMessage("[{}] Error serialising result", jobId), e);
}
if (bulkRequest.numberOfActions() >= BULK_LIMIT) {
executeRequest();

View File

@ -94,29 +94,21 @@ public class JobResultsPersister extends AbstractComponent {
bucketWithoutRecords = new Bucket(bucket);
bucketWithoutRecords.setRecords(Collections.emptyList());
}
try (XContentBuilder content = toXContentBuilder(bucketWithoutRecords)) {
String id = bucketWithoutRecords.getId();
logger.trace("[{}] ES API CALL: index bucket to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
indexResult(id, bucketWithoutRecords, "bucket");
persistBucketInfluencersStandalone(jobId, bucketWithoutRecords.getBucketInfluencers());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising bucket", jobId), e);
}
return this;
}
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) throws IOException {
private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluencer> bucketInfluencers) {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
try (XContentBuilder content = toXContentBuilder(bucketInfluencer)) {
// Need consistent IDs to ensure overwriting on renormalization
String id = bucketInfluencer.getId();
logger.trace("[{}] ES BULK ACTION: index bucket influencer to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
}
indexResult(id, bucketInfluencer, "bucket influencer");
}
}
}
@ -128,17 +120,9 @@ public class JobResultsPersister extends AbstractComponent {
* @return this
*/
public Builder persistRecords(List<AnomalyRecord> records) {
try {
for (AnomalyRecord record : records) {
try (XContentBuilder content = toXContentBuilder(record)) {
String id = record.getId();
logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
}
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising records", new Object [] {jobId}), e);
logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, record.getId());
indexResult(record.getId(), record, "record");
}
return this;
@ -152,21 +136,32 @@ public class JobResultsPersister extends AbstractComponent {
* @return this
*/
public Builder persistInfluencers(List<Influencer> influencers) {
try {
for (Influencer influencer : influencers) {
try (XContentBuilder content = toXContentBuilder(influencer)) {
String id = influencer.getId();
logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, id);
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
}
}
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising influencers", new Object[] {jobId}), e);
logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, influencer.getId());
indexResult(influencer.getId(), influencer, "influencer");
}
return this;
}
public Builder persistModelPlot(ModelPlot modelPlot) {
logger.trace("[{}] ES BULK ACTION: index model plot to index [{}] with ID [{}]", jobId, indexName, modelPlot.getId());
indexResult(modelPlot.getId(), modelPlot, "model plot");
return this;
}
private void indexResult(String id, ToXContent resultDoc, String resultType) {
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
bulkRequest.add(new IndexRequest(indexName, DOC_TYPE, id).source(content));
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising {}", jobId, resultType), e);
}
if (bulkRequest.numberOfActions() >= JobRenormalizedResultsPersister.BULK_LIMIT) {
executeRequest();
}
}
/**
* Execute the bulk action
*/
@ -254,16 +249,6 @@ public class JobResultsPersister extends AbstractComponent {
// for information at the API level
}
/**
* Persist model plot output
*/
public void persistModelPlot(ModelPlot modelPlot) {
Persistable persistable = new Persistable(modelPlot.getJobId(), modelPlot, modelPlot.getId());
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelPlot.getJobId())).actionGet();
// Don't commit as we expect masses of these updates and they're not
// read again by this process
}
/**
* Delete any existing interim results synchronously
*/

View File

@ -191,7 +191,7 @@ public class AutoDetectResultProcessor {
}
ModelPlot modelPlot = result.getModelPlot();
if (modelPlot != null) {
persister.persistModelPlot(modelPlot);
context.bulkResultsPersister.persistModelPlot(modelPlot);
}
ModelSizeStats modelSizeStats = result.getModelSizeStats();
if (modelSizeStats != null) {

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
@ -12,10 +13,16 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.process.normalizer.BucketNormalizable;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.mockito.ArgumentCaptor;
import java.util.Date;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class JobRenormalizedResultsPersisterTests extends ESTestCase {
@ -37,6 +44,20 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase {
assertEquals(0, persister.getBulkRequest().numberOfActions());
}
public void testBulkRequestExecutesWhenReachMaxDocs() {
BulkResponse bulkResponse = mock(BulkResponse.class);
Client client = new MockClientBuilder("cluster").bulk(bulkResponse).build();
JobRenormalizedResultsPersister persister = new JobRenormalizedResultsPersister("foo", Settings.EMPTY, client);
ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456);
for (int i=0; i<=JobRenormalizedResultsPersister.BULK_LIMIT; i++) {
persister.updateResult("bar", "index-foo", modelPlot);
}
verify(client, times(1)).bulk(any());
verifyNoMoreInteractions(client);
}
private JobRenormalizedResultsPersister createJobRenormalizedResultsPersister() {
BulkResponse bulkResponse = mock(BulkResponse.class);
when(bulkResponse.hasFailures()).thenReturn(false);

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelPlot;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
@ -25,7 +26,11 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ -172,6 +177,21 @@ public class JobResultsPersisterTests extends ESTestCase {
assertEquals(0, builder.getBulkRequest().numberOfActions());
}
public void testBulkRequestExecutesWhenReachMaxDocs() {
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
Client client = mockClient(captor);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo");
ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456);
for (int i=0; i<=JobRenormalizedResultsPersister.BULK_LIMIT; i++) {
bulkBuilder.persistModelPlot(modelPlot);
}
verify(client, times(1)).bulk(any());
verifyNoMoreInteractions(client);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private Client mockClient(ArgumentCaptor<BulkRequest> captor) {
Client client = mock(Client.class);

View File

@ -240,7 +240,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getModelPlot()).thenReturn(modelPlot);
processorUnderTest.processResult(context, result);
verify(persister, times(1)).persistModelPlot(modelPlot);
verify(bulkBuilder, times(1)).persistModelPlot(modelPlot);
verifyNoMoreInteractions(persister);
}
@ -314,8 +314,8 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
public void testPersisterThrowingDoesntBlockProcessing() {
AutodetectResult autodetectResult = mock(AutodetectResult.class);
ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
when(autodetectResult.getModelSizeStats()).thenReturn(modelSizeStats);
ModelSnapshot modelSnapshot = mock(ModelSnapshot.class);
when(autodetectResult.getModelSnapshot()).thenReturn(modelSnapshot);
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
@ -324,10 +324,10 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
AutodetectProcess process = mock(AutodetectProcess.class);
when(process.readAutodetectResults()).thenReturn(iterator);
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSizeStats(any());
doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any());
processorUnderTest.process(process);
verify(persister, times(2)).persistModelSizeStats(any());
verify(persister, times(2)).persistModelSnapshot(any());
}
public void testParsingErrorSetsFailed() {