Fix results not being persisted (elastic/elasticsearch#489)

Original commit: elastic/x-pack-elasticsearch@d0ee02ccf6
This commit is contained in:
David Kyle 2016-12-07 16:52:58 +00:00 committed by GitHub
parent 7cc2b8c5ce
commit af61a51e22
2 changed files with 5 additions and 1 deletions

View File

@ -109,7 +109,7 @@ public class AutoDetectResultProcessor {
// persist after deleting interim results in case the new // persist after deleting interim results in case the new
// results are also interim // results are also interim
context.bulkResultsPersister.persistBucket(bucket); context.bulkResultsPersister.persistBucket(bucket).executeRequest();
context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId); context.bulkResultsPersister = persister.bulkPersisterBuilder(context.jobId);

View File

@ -65,6 +65,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
@ -75,6 +76,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result); processor.processResult(context, result);
verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
verify(persister, times(1)).bulkPersisterBuilder(JOB_ID); verify(persister, times(1)).bulkPersisterBuilder(JOB_ID);
verify(persister, never()).deleteInterimResults(JOB_ID); verify(persister, never()).deleteInterimResults(JOB_ID);
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);
@ -85,6 +87,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
@ -94,6 +97,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
processor.processResult(context, result); processor.processResult(context, result);
verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).persistBucket(bucket);
verify(bulkBuilder, times(1)).executeRequest();
verify(persister, times(1)).deleteInterimResults(JOB_ID); verify(persister, times(1)).deleteInterimResults(JOB_ID);
verify(persister, times(1)).bulkPersisterBuilder(JOB_ID); verify(persister, times(1)).bulkPersisterBuilder(JOB_ID);
verifyNoMoreInteractions(persister); verifyNoMoreInteractions(persister);