diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java index 1aba5303285..2c9c7bf564b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java @@ -73,8 +73,11 @@ public class StateProcessor extends AbstractComponent { // No more zero bytes in this block break; } - // No validation - assume the native process has formatted the state correctly - persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); + // Ignore completely empty chunks + if (nextZeroByte > splitFrom) { + // No validation - assume the native process has formatted the state correctly + persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); + } splitFrom = nextZeroByte + 1; } if (splitFrom >= bytesRef.length()) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java index 94785055cf5..70cfb6f8262 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java @@ -26,6 +26,7 @@ import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -51,13 +52,13 @@ public class StateProcessorTests extends ESTestCase { private static final int LARGE_DOC_SIZE = 1000000; private Client client; - private ActionFuture bulkResponseFuture; private StateProcessor stateProcessor; @Before public void initialize() throws IOException { client = mock(Client.class); - bulkResponseFuture = mock(ActionFuture.class); + @SuppressWarnings("unchecked") + ActionFuture bulkResponseFuture = mock(ActionFuture.class); stateProcessor = spy(new StateProcessor(Settings.EMPTY, client)); when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture); } @@ -87,12 +88,12 @@ public class StateProcessorTests extends ESTestCase { stateProcessor.process("_id", stream); - verify(stateProcessor, times(6)).persist(eq("_id"), any()); + verify(stateProcessor, never()).persist(eq("_id"), any()); Mockito.verifyNoMoreInteractions(client); } public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException { - String zeroBytes = " \0"; + String zeroBytes = " \n\0"; ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8)); stateProcessor.process("_id", stream);