diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java index b3109e969f5..d8b82f98c9c 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersister.java @@ -92,6 +92,8 @@ public class JobRenormalizedResultsPersister extends AbstractComponent { if (addRecordsResponse.hasFailures()) { logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); } + + bulkRequest = new BulkRequest(); } BulkRequest getBulkRequest() { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java index 1779f1dfd97..97ed255dc78 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobRenormalizedResultsPersisterTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.persistence; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; @@ -14,16 +15,13 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer; import java.util.Date; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class JobRenormalizedResultsPersisterTests extends ESTestCase { public void testUpdateBucket() { - Date now = new Date(); - Bucket bucket = new Bucket("foo", now, 1); - int sequenceNum = 0; - bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++)); - bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++)); - BucketNormalizable bn = new BucketNormalizable(bucket, "foo-index"); - + BucketNormalizable bn = createBucketNormalizable(); JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister(); persister.updateBucket(bn); @@ -31,8 +29,28 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase { assertEquals("foo-index", persister.getBulkRequest().requests().get(0).index()); } + public void testExecuteRequestResetsBulkRequest() { + BucketNormalizable bn = createBucketNormalizable(); + JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister(); + persister.updateBucket(bn); + persister.executeRequest("foo"); + assertEquals(0, persister.getBulkRequest().numberOfActions()); + } + private JobRenormalizedResultsPersister createJobRenormalizedResultsPersister() { - Client client = new MockClientBuilder("cluster").build(); + BulkResponse bulkResponse = mock(BulkResponse.class); + when(bulkResponse.hasFailures()).thenReturn(false); + + Client client = new MockClientBuilder("cluster").bulk(bulkResponse).build(); return new JobRenormalizedResultsPersister(Settings.EMPTY, client); } + + private BucketNormalizable createBucketNormalizable() { + Date now = new Date(); + Bucket bucket = new Bucket("foo", now, 1); + int sequenceNum = 0; + bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++)); + bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++)); + return new BucketNormalizable(bucket, "foo-index"); + } } \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index af6ca712a3e..3c0af05f384 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequestBuilder; @@ -302,6 +303,14 @@ public class MockClientBuilder { return this; } + @SuppressWarnings("unchecked") + public MockClientBuilder bulk(BulkResponse response) { + ActionFuture actionFuture = mock(ActionFuture.class); + when(client.bulk(any(BulkRequest.class))).thenReturn(actionFuture); + when(actionFuture.actionGet()).thenReturn(response); + return this; + } + public MockClientBuilder prepareUpdateScript(String index, String type, String id, ArgumentCaptor