[ML] Fix fallout from bulk action requiring newlines (elastic/x-pack-elasticsearch#2205)

Only unit tests were broken.  Production ML code was always terminating
bulk requests with newlines.

Original commit: elastic/x-pack-elasticsearch@96ed06fed3
This commit is contained in:
David Roberts 2017-08-08 11:07:13 +01:00 committed by GitHub
parent 55e88d6857
commit 6a159d2127
2 changed files with 10 additions and 6 deletions

View File

@ -73,8 +73,11 @@ public class StateProcessor extends AbstractComponent {
// No more zero bytes in this block // No more zero bytes in this block
break; break;
} }
// Ignore completely empty chunks
if (nextZeroByte > splitFrom) {
// No validation - assume the native process has formatted the state correctly // No validation - assume the native process has formatted the state correctly
persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
}
splitFrom = nextZeroByte + 1; splitFrom = nextZeroByte + 1;
} }
if (splitFrom >= bytesRef.length()) { if (splitFrom >= bytesRef.length()) {

View File

@ -26,6 +26,7 @@ import java.util.List;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -51,13 +52,13 @@ public class StateProcessorTests extends ESTestCase {
private static final int LARGE_DOC_SIZE = 1000000; private static final int LARGE_DOC_SIZE = 1000000;
private Client client; private Client client;
private ActionFuture<BulkResponse> bulkResponseFuture;
private StateProcessor stateProcessor; private StateProcessor stateProcessor;
@Before @Before
public void initialize() throws IOException { public void initialize() throws IOException {
client = mock(Client.class); client = mock(Client.class);
bulkResponseFuture = mock(ActionFuture.class); @SuppressWarnings("unchecked")
ActionFuture<BulkResponse> bulkResponseFuture = mock(ActionFuture.class);
stateProcessor = spy(new StateProcessor(Settings.EMPTY, client)); stateProcessor = spy(new StateProcessor(Settings.EMPTY, client));
when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture); when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture);
} }
@ -87,12 +88,12 @@ public class StateProcessorTests extends ESTestCase {
stateProcessor.process("_id", stream); stateProcessor.process("_id", stream);
verify(stateProcessor, times(6)).persist(eq("_id"), any()); verify(stateProcessor, never()).persist(eq("_id"), any());
Mockito.verifyNoMoreInteractions(client); Mockito.verifyNoMoreInteractions(client);
} }
public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException { public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
String zeroBytes = " \0"; String zeroBytes = " \n\0";
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8)); ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
stateProcessor.process("_id", stream); stateProcessor.process("_id", stream);