mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 18:35:25 +00:00
[ML] Ignore non-bulk-action blocks in StateProcessor (elastic/x-pack-elasticsearch#1154)
This is in preparation for the autodetect process writing out a block of spaces in order to flush the buffer at the end of state persisting. Relates elastic/x-pack-elasticsearch#1140 Original commit: elastic/x-pack-elasticsearch@fedf1d204c
This commit is contained in:
parent
52c8469225
commit
546faa3b9b
@ -87,7 +87,9 @@ public class StateProcessor extends AbstractComponent {
|
||||
logger.trace("[{}] ES API CALL: bulk index", jobId);
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.add(bytes, null, null, XContentType.JSON);
|
||||
client.bulk(bulkRequest).actionGet();
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
client.bulk(bulkRequest).actionGet();
|
||||
}
|
||||
}
|
||||
|
||||
private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {
|
||||
|
@ -6,10 +6,15 @@
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Timeout;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.mock.orig.Mockito;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
@ -20,36 +25,46 @@ import java.util.List;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Tests for reading state from the native process.
|
||||
*/
|
||||
public class StateProcessorTests extends ESTestCase {
|
||||
|
||||
private static final String STATE_SAMPLE = "first header\n"
|
||||
+ "first data\n"
|
||||
private static final String STATE_SAMPLE = ""
|
||||
+ "{\"index\": {\"_index\": \"test\", \"_type\": \"type1\", \"_id\": \"1\"}}\n"
|
||||
+ "{ \"field\" : \"value1\" }\n"
|
||||
+ "\0"
|
||||
+ "second header\n"
|
||||
+ "second data\n"
|
||||
+ "{\"index\": {\"_index\": \"test\", \"_type\": \"type1\", \"_id\": \"2\"}}\n"
|
||||
+ "{ \"field\" : \"value2\" }\n"
|
||||
+ "\0"
|
||||
+ "third header\n"
|
||||
+ "third data\n"
|
||||
+ "{\"index\": {\"_index\": \"test\", \"_type\": \"type1\", \"_id\": \"3\"}}\n"
|
||||
+ "{ \"field\" : \"value3\" }\n"
|
||||
+ "\0";
|
||||
|
||||
private static final int NUM_LARGE_DOCS = 2;
|
||||
private static final int LARGE_DOC_SIZE = 1000000;
|
||||
|
||||
private Client client;
|
||||
private ActionFuture<BulkResponse> bulkResponseFuture;
|
||||
private StateProcessor stateProcessor;
|
||||
|
||||
@Before
|
||||
public void initialize() throws IOException {
|
||||
stateProcessor = spy(new StateProcessor(Settings.EMPTY, mock(Client.class)));
|
||||
doNothing().when(stateProcessor).persist(any(), any());
|
||||
client = mock(Client.class);
|
||||
bulkResponseFuture = mock(ActionFuture.class);
|
||||
stateProcessor = spy(new StateProcessor(Settings.EMPTY, client));
|
||||
when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture);
|
||||
}
|
||||
|
||||
@After
|
||||
public void verifyNoMoreClientInteractions() {
|
||||
Mockito.verifyNoMoreInteractions(client);
|
||||
}
|
||||
|
||||
public void testStateRead() throws IOException {
|
||||
@ -63,6 +78,27 @@ public class StateProcessorTests extends ESTestCase {
|
||||
assertEquals(threeStates[0], capturedBytes.get(0).utf8ToString());
|
||||
assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString());
|
||||
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
|
||||
verify(client, times(3)).bulk(any(BulkRequest.class));
|
||||
}
|
||||
|
||||
public void testStateReadGivenConsecutiveZeroBytes() throws IOException {
|
||||
String zeroBytes = "\0\0\0\0\0\0";
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
stateProcessor.process("_id", stream);
|
||||
|
||||
verify(stateProcessor, times(6)).persist(eq("_id"), any());
|
||||
Mockito.verifyNoMoreInteractions(client);
|
||||
}
|
||||
|
||||
public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
|
||||
String zeroBytes = " \0";
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
stateProcessor.process("_id", stream);
|
||||
|
||||
verify(stateProcessor, times(1)).persist(eq("_id"), any());
|
||||
Mockito.verifyNoMoreInteractions(client);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,5 +120,6 @@ public class StateProcessorTests extends ESTestCase {
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
|
||||
stateProcessor.process("_id", stream);
|
||||
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq("_id"), any());
|
||||
verify(client, times(NUM_LARGE_DOCS)).bulk(any(BulkRequest.class));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user