diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java index 74eb3c809ff..9e20331f0a4 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/persistence/JobResultsPersister.java @@ -192,7 +192,11 @@ public class JobResultsPersister extends AbstractComponent { * Execute the bulk action */ public void executeRequest() { + if (bulkRequest.numberOfActions() == 0) { + return; + } logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions()); + BulkResponse addRecordsResponse = bulkRequest.execute().actionGet(); if (addRecordsResponse.hasFailures()) { logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage()); diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java index 9771c49dd1b..7cb648d58fd 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -159,6 +159,7 @@ public class AutoDetectResultProcessor { // Commit previous writes here, effectively continuing // the flush from the C++ autodetect process right // through to the data store + context.bulkResultsPersister.executeRequest(); persister.commitResultWrites(context.jobId); flushListener.acknowledgeFlush(flushAcknowledgement.getId()); // Interim results may have been produced by the flush, diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java index f2702c3f190..9431c10564b 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/AutodetectResultProcessorIT.java @@ -240,6 +240,47 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { assertResultsAreSame(finalAnomalyRecords, persistedRecords); } + public void testEndOfStreamTriggersPersisting() throws IOException, InterruptedException { + createJob(); + + AutoDetectResultProcessor resultProcessor = + new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser); + + PipedOutputStream outputStream = new PipedOutputStream(); + PipedInputStream inputStream = new PipedInputStream(outputStream); + + Bucket bucket = createBucket(false); + List firstSetOfRecords = createRecords(false); + List secondSetOfRecords = createRecords(false); + + ResultsBuilder resultBuilder = new ResultsBuilder() + .start() + .addRecords(firstSetOfRecords) + .addBucket(bucket) // bucket triggers persistence + .addRecords(secondSetOfRecords) + .end(); // end of stream should persist the second bunch of records + + new Thread(() -> { + try { + writeResults(resultBuilder.build(), outputStream); + } catch (IOException e) { + } + }).start(); + + resultProcessor.process(JOB_ID, inputStream, false); + jobResultsPersister.commitResultWrites(JOB_ID); + + QueryPage persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build()); + assertEquals(1, persistedBucket.count()); + + QueryPage persistedRecords = jobProvider.records(JOB_ID, + new RecordsQueryBuilder().size(200).includeInterim(true).build()); + + List allRecords = new ArrayList<>(firstSetOfRecords); + allRecords.addAll(secondSetOfRecords); + assertResultsAreSame(allRecords, persistedRecords); + } + private void writeResults(XContentBuilder builder, OutputStream out) throws IOException { builder.bytes().writeTo(out); } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 1737cab98d3..b4a9ee51fb0 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -175,6 +175,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { Renormalizer renormalizer = mock(Renormalizer.class); JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder); @@ -184,6 +185,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { when(result.getCategoryDefinition()).thenReturn(categoryDefinition); processor.processResult(context, result); + verify(bulkBuilder, never()).executeRequest(); verify(persister, times(1)).persistCategoryDefinition(categoryDefinition); verifyNoMoreInteractions(persister); } @@ -192,6 +194,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { Renormalizer renormalizer = mock(Renormalizer.class); JobResultsPersister persister = mock(JobResultsPersister.class); JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); FlushListener flushListener = mock(FlushListener.class); AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener); @@ -205,7 +208,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); verify(persister, times(1)).commitResultWrites(JOB_ID); - verify(bulkBuilder, never()).executeRequest(); + verify(bulkBuilder, times(1)).executeRequest(); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); } @@ -226,13 +229,13 @@ public class AutoDetectResultProcessorTests extends ESTestCase { CategoryDefinition categoryDefinition = mock(CategoryDefinition.class); when(result.getCategoryDefinition()).thenReturn(categoryDefinition); - InOrder inOrder = inOrder(persister, flushListener); + InOrder inOrder = inOrder(persister, bulkBuilder, flushListener); processor.processResult(context, result); inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition); + inOrder.verify(bulkBuilder, times(1)).executeRequest(); inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID); inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID); - verify(bulkBuilder, never()).executeRequest(); verifyNoMoreInteractions(persister); assertTrue(context.deleteInterimRequired); }