Fix bulkrequest validation exception in the log (elastic/elasticsearch#610)

* Check the bulk request contains actions before executing.

This suppresses an validation exception about no requests being added.

* Persist bulk request before refreshing the indexes on a flush acknowledgment

Original commit: elastic/x-pack-elasticsearch@22543e46c8
This commit is contained in:
David Kyle 2016-12-22 11:23:58 +00:00 committed by GitHub
parent 84b419052f
commit 95cfae59ea
4 changed files with 52 additions and 3 deletions

View File

@ -192,7 +192,11 @@ public class JobResultsPersister extends AbstractComponent {
* Execute the bulk action
*/
public void executeRequest() {
if (bulkRequest.numberOfActions() == 0) {
return;
}
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
BulkResponse addRecordsResponse = bulkRequest.execute().actionGet();
if (addRecordsResponse.hasFailures()) {
logger.error("[{}] Bulk index of results has errors: {}", jobId, addRecordsResponse.buildFailureMessage());

View File

@ -159,6 +159,7 @@ public class AutoDetectResultProcessor {
// Commit previous writes here, effectively continuing
// the flush from the C++ autodetect process right
// through to the data store
context.bulkResultsPersister.executeRequest();
persister.commitResultWrites(context.jobId);
flushListener.acknowledgeFlush(flushAcknowledgement.getId());
// Interim results may have been produced by the flush,

View File

@ -240,6 +240,47 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertResultsAreSame(finalAnomalyRecords, persistedRecords);
}
public void testEndOfStreamTriggersPersisting() throws IOException, InterruptedException {
createJob();
AutoDetectResultProcessor resultProcessor =
new AutoDetectResultProcessor(renormalizer, jobResultsPersister, autodetectResultsParser);
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
Bucket bucket = createBucket(false);
List<AnomalyRecord> firstSetOfRecords = createRecords(false);
List<AnomalyRecord> secondSetOfRecords = createRecords(false);
ResultsBuilder resultBuilder = new ResultsBuilder()
.start()
.addRecords(firstSetOfRecords)
.addBucket(bucket) // bucket triggers persistence
.addRecords(secondSetOfRecords)
.end(); // end of stream should persist the second bunch of records
new Thread(() -> {
try {
writeResults(resultBuilder.build(), outputStream);
} catch (IOException e) {
}
}).start();
resultProcessor.process(JOB_ID, inputStream, false);
jobResultsPersister.commitResultWrites(JOB_ID);
QueryPage<Bucket> persistedBucket = jobProvider.buckets(JOB_ID, new BucketsQueryBuilder().includeInterim(true).build());
assertEquals(1, persistedBucket.count());
QueryPage<AnomalyRecord> persistedRecords = jobProvider.records(JOB_ID,
new RecordsQueryBuilder().size(200).includeInterim(true).build());
List<AnomalyRecord> allRecords = new ArrayList<>(firstSetOfRecords);
allRecords.addAll(secondSetOfRecords);
assertResultsAreSame(allRecords, persistedRecords);
}
private void writeResults(XContentBuilder builder, OutputStream out) throws IOException {
builder.bytes().writeTo(out);
}

View File

@ -175,6 +175,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null);
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, false, bulkBuilder);
@ -184,6 +185,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
processor.processResult(context, result);
verify(bulkBuilder, never()).executeRequest();
verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
verifyNoMoreInteractions(persister);
}
@ -192,6 +194,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
Renormalizer renormalizer = mock(Renormalizer.class);
JobResultsPersister persister = mock(JobResultsPersister.class);
JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
FlushListener flushListener = mock(FlushListener.class);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormalizer, persister, null, flushListener);
@ -205,7 +208,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(persister, times(1)).commitResultWrites(JOB_ID);
verify(bulkBuilder, never()).executeRequest();
verify(bulkBuilder, times(1)).executeRequest();
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
}
@ -226,13 +229,13 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
InOrder inOrder = inOrder(persister, flushListener);
InOrder inOrder = inOrder(persister, bulkBuilder, flushListener);
processor.processResult(context, result);
inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
inOrder.verify(bulkBuilder, times(1)).executeRequest();
inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID);
inOrder.verify(flushListener, times(1)).acknowledgeFlush(JOB_ID);
verify(bulkBuilder, never()).executeRequest();
verifyNoMoreInteractions(persister);
assertTrue(context.deleteInterimRequired);
}