[ML] Clear bulk requests after execution (elastic/x-pack-elasticsearch#912)

Original commit: elastic/x-pack-elasticsearch@31a4c7e99a
This commit is contained in:
David Kyle 2017-03-31 15:37:01 +01:00 committed by GitHub
parent 878c7a4eb9
commit 461bf314fa
4 changed files with 24 additions and 3 deletions

View File

@ -202,6 +202,13 @@ public class JobResultsPersister extends AbstractComponent {
if (addRecordsResponse.hasFailures()) { if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
} }
bulkRequest = new BulkRequest();
}
// for testing
BulkRequest getBulkRequest() {
return bulkRequest;
} }
} }

View File

@ -129,7 +129,6 @@ public class AutoDetectResultProcessor {
// persist after deleting interim results in case the new // persist after deleting interim results in case the new
// results are also interim // results are also interim
context.bulkResultsPersister.persistBucket(bucket).executeRequest(); context.bulkResultsPersister.persistBucket(bucket).executeRequest();
context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId);
} }
List<AnomalyRecord> records = result.getRecords(); List<AnomalyRecord> records = result.getRecords();
if (records != null && !records.isEmpty()) { if (records != null && !records.isEmpty()) {

View File

@ -157,6 +157,23 @@ public class JobResultsPersisterTests extends ESTestCase {
assertTrue(s.matches(".*influencer_score.:16\\.0.*")); assertTrue(s.matches(".*influencer_score.:16\\.0.*"));
} }
public void testExecuteRequest_ClearsBulkRequest() {
ArgumentCaptor<BulkRequest> captor = ArgumentCaptor.forClass(BulkRequest.class);
Client client = mockClient(captor);
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
List<Influencer> influencers = new ArrayList<>();
Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600, 1);
inf.setInfluencerScore(16);
inf.setInitialInfluencerScore(55.5);
inf.setProbability(0.4);
influencers.add(inf);
JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID);
builder.persistInfluencers(influencers).executeRequest();
assertEquals(0, builder.getBulkRequest().numberOfActions());
}
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
private Client mockClient(ArgumentCaptor<BulkRequest> captor) { private Client mockClient(ArgumentCaptor<BulkRequest> captor) {
Client client = mock(Client.class); Client client = mock(Client.class);

View File

@ -88,7 +88,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest(); verify(bulkBuilder, times(1)).executeRequest();
verify(persister, times(1)).bulkPersisterBuilder(JOB_ID);
verify(persister, never()).deleteInterimResults(JOB_ID); verify(persister, never()).deleteInterimResults(JOB_ID);
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
} }
@ -108,7 +107,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest(); verify(bulkBuilder, times(1)).executeRequest();
verify(persister, times(1)).deleteInterimResults(JOB_ID); verify(persister, times(1)).deleteInterimResults(JOB_ID);
verify(persister, times(1)).bulkPersisterBuilder(JOB_ID);
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
assertFalse(context.deleteInterimRequired); assertFalse(context.deleteInterimRequired);
} }