From 6a159d212766e55bfa748a1d061c84dfd7134e38 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 8 Aug 2017 11:07:13 +0100 Subject: [PATCH] [ML] Fix fallout from bulk action requiring newlines (elastic/x-pack-elasticsearch#2205) Only unit tests were broken. Production ML code was always terminating bulk requests with newlines. Original commit: elastic/x-pack-elasticsearch@96ed06fed31bfc8bb86285d85d135e159e3f7f6a --- .../ml/job/process/autodetect/output/StateProcessor.java | 7 +++++-- .../process/autodetect/output/StateProcessorTests.java | 9 +++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java index 1aba5303285..2c9c7bf564b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java @@ -73,8 +73,11 @@ public class StateProcessor extends AbstractComponent { // No more zero bytes in this block break; } - // No validation - assume the native process has formatted the state correctly - persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); + // Ignore completely empty chunks + if (nextZeroByte > splitFrom) { + // No validation - assume the native process has formatted the state correctly + persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); + } splitFrom = nextZeroByte + 1; } if (splitFrom >= bytesRef.length()) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java index 94785055cf5..70cfb6f8262 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java @@ -26,6 +26,7 @@ import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -51,13 +52,13 @@ public class StateProcessorTests extends ESTestCase { private static final int LARGE_DOC_SIZE = 1000000; private Client client; - private ActionFuture bulkResponseFuture; private StateProcessor stateProcessor; @Before public void initialize() throws IOException { client = mock(Client.class); - bulkResponseFuture = mock(ActionFuture.class); + @SuppressWarnings("unchecked") + ActionFuture bulkResponseFuture = mock(ActionFuture.class); stateProcessor = spy(new StateProcessor(Settings.EMPTY, client)); when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture); } @@ -87,12 +88,12 @@ public class StateProcessorTests extends ESTestCase { stateProcessor.process("_id", stream); - verify(stateProcessor, times(6)).persist(eq("_id"), any()); + verify(stateProcessor, never()).persist(eq("_id"), any()); Mockito.verifyNoMoreInteractions(client); } public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException { - String zeroBytes = " \0"; + String zeroBytes = " \n\0"; ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8)); stateProcessor.process("_id", stream);