Fix bug updating normalised results (elastic/elasticsearch#765)

The bulk request needed resetting after it was executed otherwise stale documents are persisted repeatedly after they have been updated causing a versioning error

Original commit: elastic/x-pack-elasticsearch@263fa9d25d
This commit is contained in:
David Kyle 2017-01-20 17:33:37 +00:00 committed by GitHub
parent 4c6989212a
commit ecd462bf89
3 changed files with 37 additions and 8 deletions

View File

@ -92,6 +92,8 @@ public class JobRenormalizedResultsPersister extends AbstractComponent {
if (addRecordsResponse.hasFailures()) { if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
} }
bulkRequest = new BulkRequest();
} }
BulkRequest getBulkRequest() { BulkRequest getBulkRequest() {

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.job.persistence; package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -14,16 +15,13 @@ import org.elasticsearch.xpack.ml.job.results.BucketInfluencer;
import java.util.Date; import java.util.Date;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class JobRenormalizedResultsPersisterTests extends ESTestCase { public class JobRenormalizedResultsPersisterTests extends ESTestCase {
public void testUpdateBucket() { public void testUpdateBucket() {
Date now = new Date(); BucketNormalizable bn = createBucketNormalizable();
Bucket bucket = new Bucket("foo", now, 1);
int sequenceNum = 0;
bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++));
bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++));
BucketNormalizable bn = new BucketNormalizable(bucket, "foo-index");
JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister(); JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister();
persister.updateBucket(bn); persister.updateBucket(bn);
@ -31,8 +29,28 @@ public class JobRenormalizedResultsPersisterTests extends ESTestCase {
assertEquals("foo-index", persister.getBulkRequest().requests().get(0).index()); assertEquals("foo-index", persister.getBulkRequest().requests().get(0).index());
} }
public void testExecuteRequestResetsBulkRequest() {
BucketNormalizable bn = createBucketNormalizable();
JobRenormalizedResultsPersister persister = createJobRenormalizedResultsPersister();
persister.updateBucket(bn);
persister.executeRequest("foo");
assertEquals(0, persister.getBulkRequest().numberOfActions());
}
private JobRenormalizedResultsPersister createJobRenormalizedResultsPersister() { private JobRenormalizedResultsPersister createJobRenormalizedResultsPersister() {
Client client = new MockClientBuilder("cluster").build(); BulkResponse bulkResponse = mock(BulkResponse.class);
when(bulkResponse.hasFailures()).thenReturn(false);
Client client = new MockClientBuilder("cluster").bulk(bulkResponse).build();
return new JobRenormalizedResultsPersister(Settings.EMPTY, client); return new JobRenormalizedResultsPersister(Settings.EMPTY, client);
} }
private BucketNormalizable createBucketNormalizable() {
Date now = new Date();
Bucket bucket = new Bucket("foo", now, 1);
int sequenceNum = 0;
bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++));
bucket.addBucketInfluencer(new BucketInfluencer("foo", now, 1, sequenceNum++));
return new BucketNormalizable(bucket, "foo-index");
}
} }

View File

@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetRequestBuilder;
@ -302,6 +303,14 @@ public class MockClientBuilder {
return this; return this;
} }
@SuppressWarnings("unchecked")
public MockClientBuilder bulk(BulkResponse response) {
ActionFuture<BulkResponse> actionFuture = mock(ActionFuture.class);
when(client.bulk(any(BulkRequest.class))).thenReturn(actionFuture);
when(actionFuture.actionGet()).thenReturn(response);
return this;
}
public MockClientBuilder prepareUpdateScript(String index, String type, String id, ArgumentCaptor<Script> getSource, public MockClientBuilder prepareUpdateScript(String index, String type, String id, ArgumentCaptor<Script> getSource,
ArgumentCaptor<Map<String, Object>> getParams) { ArgumentCaptor<Map<String, Object>> getParams) {
UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class); UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class);