mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
This commit is contained in:
parent
c77b80f01e
commit
c7cc383d33
@ -9,10 +9,20 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.IdsQueryBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
|
||||
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
@ -22,10 +32,24 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Reads state documents of a stream, splits them and persists to an index via a bulk request
|
||||
* Reads state documents of a stream, splits them and persists to an index via a bulk request.
|
||||
*
|
||||
* Some types of state, for example data frame analytics state and categorizer state, are written multiple times with the same document id.
|
||||
* The code needs to make sure that even after .ml-state index rollover there are no duplicate documents across the .ml-state*
|
||||
* indices. Such duplicates are undesirable for at least two reasons:
|
||||
* 1. We deliberately have no mappings on the state index so we cannot sort and filter in a search
|
||||
* 2. The state documents are large, so having dead documents with duplicate IDs is suboptimal from a disk usage perspective
|
||||
*
|
||||
* In order to avoid duplicates the following sequence of steps is executed every time the document is about to get persisted:
|
||||
* 1. The first non-blank line is extracted from the given bytes. Lines are delimited by the new line character ('\n')
|
||||
* 2. Document id is extracted from this line.
|
||||
* 3. Document with this id is searched for in .ml-state* indices
|
||||
* 4. If the document is found, it is overwritten in place (i.e. in the same index) with the new content.
|
||||
* Otherwise, it is written to the index pointed by the current write alias, i.e. .ml-state-writei
|
||||
*/
|
||||
public class IndexingStateProcessor implements StateProcessor {
|
||||
|
||||
@ -88,7 +112,7 @@ public class IndexingStateProcessor implements StateProcessor {
|
||||
// Ignore completely empty chunks
|
||||
if (nextZeroByte > splitFrom) {
|
||||
// No validation - assume the native process has formatted the state correctly
|
||||
persist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
|
||||
findAppropriateIndexOrAliasAndPersist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom));
|
||||
}
|
||||
splitFrom = nextZeroByte + 1;
|
||||
}
|
||||
@ -98,11 +122,25 @@ public class IndexingStateProcessor implements StateProcessor {
|
||||
return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom);
|
||||
}
|
||||
|
||||
void persist(BytesReference bytes) throws IOException {
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), XContentType.JSON);
|
||||
/**
|
||||
* Finds an appropriate index the document should be put in and then persists the document in that index.
|
||||
* For what is considered to be "appropriate" see the class documentation.
|
||||
*/
|
||||
void findAppropriateIndexOrAliasAndPersist(BytesReference bytes) throws IOException {
|
||||
String firstNonBlankLine = extractFirstNonBlankLine(bytes);
|
||||
if (firstNonBlankLine == null) {
|
||||
return;
|
||||
}
|
||||
String stateDocId = extractDocId(firstNonBlankLine);
|
||||
String indexOrAlias = getConcreteIndexOrWriteAlias(stateDocId);
|
||||
persist(indexOrAlias, bytes);
|
||||
}
|
||||
|
||||
void persist(String indexOrAlias, BytesReference bytes) throws IOException {
|
||||
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||
bulkRequest.add(bytes, indexOrAlias, XContentType.JSON);
|
||||
if (bulkRequest.numberOfActions() > 0) {
|
||||
LOGGER.trace("[{}] Persisting job state document", jobId);
|
||||
LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length());
|
||||
try {
|
||||
resultsPersisterService.bulkIndexWithRetry(bulkRequest,
|
||||
jobId,
|
||||
@ -117,12 +155,79 @@ public class IndexingStateProcessor implements StateProcessor {
|
||||
}
|
||||
|
||||
private static int findNextZeroByte(BytesReference bytesRef, int searchFrom, int splitFrom) {
|
||||
for (int i = Math.max(searchFrom, splitFrom); i < bytesRef.length(); ++i) {
|
||||
if (bytesRef.get(i) == 0) {
|
||||
return i;
|
||||
return bytesRef.indexOf((byte)0, Math.max(searchFrom, splitFrom));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
/**
|
||||
* Extracts document id from the given {@code bytesRef}.
|
||||
* Only first non-blank line is parsed and document id is assumed to be a nested "index._id" field of type String.
|
||||
*/
|
||||
static String extractDocId(String firstNonBlankLine) throws IOException {
|
||||
try (XContentParser parser =
|
||||
JsonXContent.jsonXContent.createParser(
|
||||
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, firstNonBlankLine)) {
|
||||
Map<String, Object> map = parser.map();
|
||||
if ((map.get("index") instanceof Map) == false) {
|
||||
throw new IllegalStateException("Could not extract \"index\" field out of [" + firstNonBlankLine + "]");
|
||||
}
|
||||
map = (Map<String, Object>)map.get("index");
|
||||
if ((map.get("_id") instanceof String) == false) {
|
||||
throw new IllegalStateException("Could not extract \"index._id\" field out of [" + firstNonBlankLine + "]");
|
||||
}
|
||||
return (String)map.get("_id");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the first non-blank line from the given {@code bytesRef}.
|
||||
* Lines are separated by the new line character ('\n').
|
||||
* A line is considered blank if it only consists of space characters (' ').
|
||||
*/
|
||||
private static String extractFirstNonBlankLine(BytesReference bytesRef) {
|
||||
for (int searchFrom = 0; searchFrom < bytesRef.length();) {
|
||||
int newLineMarkerIndex = bytesRef.indexOf((byte) '\n', searchFrom);
|
||||
int searchTo = newLineMarkerIndex != -1 ? newLineMarkerIndex : bytesRef.length();
|
||||
if (isBlank(bytesRef, searchFrom, searchTo) == false) {
|
||||
return bytesRef.slice(searchFrom, searchTo - searchFrom).utf8ToString();
|
||||
}
|
||||
searchFrom = newLineMarkerIndex != -1 ? newLineMarkerIndex + 1 : bytesRef.length();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the line pointed to by a pair of indexes: {@code from} (inclusive) and {@code to} (exclusive) is blank.
|
||||
* A line is considered blank if it only consists of space characters (' ').
|
||||
*/
|
||||
private static boolean isBlank(BytesReference bytesRef, int from, int to) {
|
||||
for (int i = from; i < to; ++i) {
|
||||
if (bytesRef.get(i) != ((byte) ' ')) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
return true;
|
||||
}
|
||||
|
||||
private String getConcreteIndexOrWriteAlias(String documentId) {
|
||||
Objects.requireNonNull(documentId);
|
||||
SearchRequest searchRequest =
|
||||
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
|
||||
.allowPartialSearchResults(false)
|
||||
.source(
|
||||
new SearchSourceBuilder()
|
||||
.size(1)
|
||||
.trackTotalHits(false)
|
||||
.query(new BoolQueryBuilder().filter(new IdsQueryBuilder().addIds(documentId))));
|
||||
SearchResponse searchResponse =
|
||||
resultsPersisterService.searchWithRetry(
|
||||
searchRequest,
|
||||
jobId,
|
||||
() -> true,
|
||||
(msg) -> auditor.warning(jobId, documentId + " " + msg));
|
||||
return searchResponse.getHits().getHits().length > 0
|
||||
? searchResponse.getHits().getHits()[0].getIndex()
|
||||
: AnomalyDetectorsIndex.jobStateIndexWriteAlias();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,12 +8,13 @@ package org.elasticsearch.xpack.ml.process;
|
||||
import com.carrotsearch.randomizedtesting.annotations.Timeout;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.mock.orig.Mockito;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
|
||||
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
|
||||
import org.junit.After;
|
||||
@ -22,15 +23,23 @@ import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
@ -39,6 +48,7 @@ import static org.mockito.Mockito.when;
|
||||
public class IndexingStateProcessorTests extends ESTestCase {
|
||||
|
||||
private static final String STATE_SAMPLE = ""
|
||||
+ " \n"
|
||||
+ "{\"index\": {\"_index\": \"test\", \"_id\": \"1\"}}\n"
|
||||
+ "{ \"field\" : \"value1\" }\n"
|
||||
+ "\0"
|
||||
@ -56,54 +66,99 @@ public class IndexingStateProcessorTests extends ESTestCase {
|
||||
|
||||
private IndexingStateProcessor stateProcessor;
|
||||
private ResultsPersisterService resultsPersisterService;
|
||||
private SearchResponse searchResponse;
|
||||
|
||||
@Before
|
||||
public void initialize() {
|
||||
searchResponse = mock(SearchResponse.class);
|
||||
when(searchResponse.status()).thenReturn(RestStatus.OK);
|
||||
resultsPersisterService = mock(ResultsPersisterService.class);
|
||||
doReturn(searchResponse).when(resultsPersisterService).searchWithRetry(any(SearchRequest.class), any(), any(), any());
|
||||
doReturn(mock(BulkResponse.class)).when(resultsPersisterService).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
|
||||
AnomalyDetectionAuditor auditor = mock(AnomalyDetectionAuditor.class);
|
||||
stateProcessor = spy(new IndexingStateProcessor(JOB_ID, resultsPersisterService, auditor));
|
||||
when(resultsPersisterService.bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any())).thenReturn(mock(BulkResponse.class));
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||
}
|
||||
|
||||
@After
|
||||
public void verifyNoMoreClientInteractions() {
|
||||
Mockito.verifyNoMoreInteractions(resultsPersisterService);
|
||||
verifyNoMoreInteractions(resultsPersisterService);
|
||||
}
|
||||
|
||||
public void testStateRead() throws IOException {
|
||||
public void testExtractDocId() throws IOException {
|
||||
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_index\": \"test\", \"_id\": \"1\" } }\n"), equalTo("1"));
|
||||
assertThat(IndexingStateProcessor.extractDocId("{ \"index\": {\"_id\": \"2\" } }\n"), equalTo("2"));
|
||||
}
|
||||
|
||||
private void testStateRead(SearchHits searchHits, String expectedIndexOrAlias) throws IOException {
|
||||
when(searchResponse.getHits()).thenReturn(searchHits);
|
||||
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8));
|
||||
stateProcessor.process(stream);
|
||||
ArgumentCaptor<BytesReference> bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class);
|
||||
verify(stateProcessor, times(3)).persist(bytesRefCaptor.capture());
|
||||
verify(stateProcessor, times(3)).persist(eq(expectedIndexOrAlias), bytesRefCaptor.capture());
|
||||
|
||||
String[] threeStates = STATE_SAMPLE.split("\0");
|
||||
List<BytesReference> capturedBytes = bytesRefCaptor.getAllValues();
|
||||
assertEquals(threeStates[0], capturedBytes.get(0).utf8ToString());
|
||||
assertEquals(threeStates[1], capturedBytes.get(1).utf8ToString());
|
||||
assertEquals(threeStates[2], capturedBytes.get(2).utf8ToString());
|
||||
verify(resultsPersisterService, times(3)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
|
||||
verify(resultsPersisterService, times(3)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
|
||||
}
|
||||
|
||||
public void testStateRead_StateDocumentCreated() throws IOException {
|
||||
testStateRead(SearchHits.empty(), ".ml-state-write");
|
||||
}
|
||||
|
||||
public void testStateRead_StateDocumentUpdated() throws IOException {
|
||||
testStateRead(
|
||||
new SearchHits(new SearchHit[]{ SearchHit.createFromMap(Collections.singletonMap("_index", ".ml-state-dummy")) }, null, 0.0f),
|
||||
".ml-state-dummy");
|
||||
}
|
||||
|
||||
public void testStateReadGivenConsecutiveZeroBytes() throws IOException {
|
||||
String zeroBytes = "\0\0\0\0\0\0";
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
stateProcessor.process(stream);
|
||||
|
||||
verify(stateProcessor, never()).persist(any());
|
||||
Mockito.verifyNoMoreInteractions(resultsPersisterService);
|
||||
verify(stateProcessor, never()).persist(any(), any());
|
||||
}
|
||||
|
||||
public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOException {
|
||||
String zeroBytes = " \n\0";
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8));
|
||||
public void testStateReadGivenSpacesAndNewLineCharactersFollowedByZeroByte() throws IOException {
|
||||
Function<String, InputStream> stringToInputStream = s -> new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
stateProcessor.process(stream);
|
||||
stateProcessor.process(stringToInputStream.apply("\0"));
|
||||
stateProcessor.process(stringToInputStream.apply(" \0"));
|
||||
stateProcessor.process(stringToInputStream.apply("\n\0"));
|
||||
stateProcessor.process(stringToInputStream.apply(" \0"));
|
||||
stateProcessor.process(stringToInputStream.apply(" \n \0"));
|
||||
stateProcessor.process(stringToInputStream.apply(" \n\n \0"));
|
||||
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
|
||||
stateProcessor.process(stringToInputStream.apply(" \n \n \0"));
|
||||
stateProcessor.process(stringToInputStream.apply("\n \n \0"));
|
||||
|
||||
verify(stateProcessor, times(1)).persist(any());
|
||||
Mockito.verifyNoMoreInteractions(resultsPersisterService);
|
||||
verify(stateProcessor, never()).persist(any(), any());
|
||||
}
|
||||
|
||||
public void testStateReadGivenNoIndexField() throws IOException {
|
||||
String bytes = " \n \n \n \n\n {}\0";
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
|
||||
assertThat(e.getMessage(), containsString("Could not extract \"index\" field"));
|
||||
|
||||
verify(stateProcessor, never()).persist(any(), any());
|
||||
}
|
||||
|
||||
public void testStateReadGivenNoIdField() throws IOException {
|
||||
String bytes = " \n \n \n {\"index\": {}}\0";
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(bytes.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
Exception e = expectThrows(IllegalStateException.class, () -> stateProcessor.process(stream));
|
||||
assertThat(e.getMessage(), containsString("Could not extract \"index._id\" field"));
|
||||
|
||||
verify(stateProcessor, never()).persist(any(), any());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -113,9 +168,11 @@ public class IndexingStateProcessorTests extends ESTestCase {
|
||||
*/
|
||||
@Timeout(millis = 10 * 1000)
|
||||
public void testLargeStateRead() throws Exception {
|
||||
when(searchResponse.getHits()).thenReturn(SearchHits.empty());
|
||||
|
||||
StringBuilder builder = new StringBuilder(NUM_LARGE_DOCS * (LARGE_DOC_SIZE + 10)); // 10 for header and separators
|
||||
for (int docNum = 1; docNum <= NUM_LARGE_DOCS; ++docNum) {
|
||||
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\"}}\n");
|
||||
builder.append("{\"index\":{\"_index\":\"header").append(docNum).append("\",\"_id\":\"doc").append(docNum).append("\"}}\n");
|
||||
for (int count = 0; count < (LARGE_DOC_SIZE / "data".length()); ++count) {
|
||||
builder.append("data");
|
||||
}
|
||||
@ -124,7 +181,8 @@ public class IndexingStateProcessorTests extends ESTestCase {
|
||||
|
||||
ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8));
|
||||
stateProcessor.process(stream);
|
||||
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(any());
|
||||
verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq(".ml-state-write"), any());
|
||||
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).searchWithRetry(any(SearchRequest.class), any(), any(), any());
|
||||
verify(resultsPersisterService, times(NUM_LARGE_DOCS)).bulkIndexWithRetry(any(BulkRequest.class), any(), any(), any());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user