From 461bf314fa364d8b7bc4a8ee61b7c915ef2c4dd4 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 31 Mar 2017 15:37:01 +0100 Subject: [PATCH] [ML] Clear bulk requests after execution (elastic/x-pack-elasticsearch#912) Original commit: elastic/x-pack-elasticsearch@31a4c7e99a0e7b2f05381cb364f03441bc5cec19 --- .../ml/job/persistence/JobResultsPersister.java | 7 +++++++ .../output/AutoDetectResultProcessor.java | 1 - .../persistence/JobResultsPersisterTests.java | 17 +++++++++++++++++ .../output/AutoDetectResultProcessorTests.java | 2 -- 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index b291a94075d..13413000774 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -202,6 +202,13 @@ public class JobResultsPersister extends AbstractComponent { if (addRecordsResponse.hasFailures()) { logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); } + + bulkRequest = new BulkRequest(); + } + + // for testing + BulkRequest getBulkRequest() { + return bulkRequest; } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index ebecf342946..b809cf5cb2b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -129,7 +129,6 @@ public class AutoDetectResultProcessor { // persist after deleting interim results in case the new // results are also interim context.bulkResultsPersister.persistBucket(bucket).executeRequest(); - context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId); } List records = result.getRecords(); if (records != null && !records.isEmpty()) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index f04532d710b..cd9502f3672 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -157,6 +157,23 @@ public class JobResultsPersisterTests extends ESTestCase { assertTrue(s.matches(".*influencer_score.:16\\.0.*")); } + public void testExecuteRequest_ClearsBulkRequest() { + ArgumentCaptor captor = ArgumentCaptor.forClass(BulkRequest.class); + Client client = mockClient(captor); + JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client); + + List influencers = new ArrayList<>(); + Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1", new Date(), 600, 1); + inf.setInfluencerScore(16); + inf.setInitialInfluencerScore(55.5); + inf.setProbability(0.4); + influencers.add(inf); + + JobResultsPersister.Builder builder = persister.bulkPersisterBuilder(JOB_ID); + builder.persistInfluencers(influencers).executeRequest(); + assertEquals(0, builder.getBulkRequest().numberOfActions()); + } + @SuppressWarnings({"unchecked", "rawtypes"}) private Client mockClient(ArgumentCaptor captor) { Client client = mock(Client.class); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 0c62c073b3e..ffb82c2bc35 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -88,7 +88,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).executeRequest(); - verify(persister, times(1)).bulkPersisterBuilder(JOB_ID); verify(persister, never()).deleteInterimResults(JOB_ID); verifyNoMoreInteractions(persister); } @@ -108,7 +107,6 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).executeRequest(); verify(persister, times(1)).deleteInterimResults(JOB_ID); - verify(persister, times(1)).bulkPersisterBuilder(JOB_ID); verifyNoMoreInteractions(persister); assertFalse(context.deleteInterimRequired); }