[ML-DataFrame] adapt page size on circuit breaker responses (#41149)
handle circuit breaker response and adapt page size to reduce memory pressure, reduce preview buckets to 100, initial page size to 500
This commit is contained in:
parent
043c1f5d42
commit
02247cc7df
|
@ -56,6 +56,11 @@ public class DataFrameMessages {
|
|||
"Failed to parse group_by for data frame pivot transform";
|
||||
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION =
|
||||
"Failed to parse aggregation for data frame pivot transform";
|
||||
public static final String LOG_DATA_FRAME_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE =
|
||||
"Search returned with out of memory error, reducing number of buckets per search from [{0}] to [{1}]";
|
||||
public static final String LOG_DATA_FRAME_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE =
|
||||
"Search returned with out of memory error after repeated page size reductions to [{0}], unable to continue pivot, "
|
||||
+ "please simplify job or increase heap size on data nodes.";
|
||||
|
||||
public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
|
||||
"Failed to parse transform checkpoints for [{0}]";
|
||||
|
|
|
@ -260,7 +260,8 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
|
|||
createPreviewRequest.setJsonEntity(config);
|
||||
Map<String, Object> previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest));
|
||||
List<Map<String, Object>> preview = (List<Map<String, Object>>)previewDataframeResponse.get("preview");
|
||||
assertThat(preview.size(), equalTo(393));
|
||||
// preview is limited to 100
|
||||
assertThat(preview.size(), equalTo(100));
|
||||
Set<String> expectedFields = new HashSet<>(Arrays.asList("reviewer", "by_day", "avg_rating"));
|
||||
preview.forEach(p -> {
|
||||
Set<String> keys = p.keySet();
|
||||
|
|
|
@ -35,6 +35,7 @@ import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMP
|
|||
public class TransportPreviewDataFrameTransformAction extends
|
||||
HandledTransportAction<PreviewDataFrameTransformAction.Request, PreviewDataFrameTransformAction.Response> {
|
||||
|
||||
private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
|
||||
private final XPackLicenseState licenseState;
|
||||
private final Client client;
|
||||
private final ThreadPool threadPool;
|
||||
|
@ -77,7 +78,7 @@ public class TransportPreviewDataFrameTransformAction extends
|
|||
ClientHelper.DATA_FRAME_ORIGIN,
|
||||
client,
|
||||
SearchAction.INSTANCE,
|
||||
pivot.buildSearchRequest(null),
|
||||
pivot.buildSearchRequest(null, NUMBER_OF_PREVIEW_BUCKETS),
|
||||
ActionListener.wrap(
|
||||
r -> {
|
||||
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
|
||||
|
|
|
@ -8,14 +8,21 @@ package org.elasticsearch.xpack.dataframe.transforms;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
|
||||
import org.elasticsearch.xpack.core.common.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
|
||||
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
|
||||
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
|
||||
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
|
||||
|
@ -26,6 +33,7 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
|
|||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -35,22 +43,34 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
|||
|
||||
public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, DataFrameIndexerTransformStats> {
|
||||
|
||||
public static final int MINIMUM_PAGE_SIZE = 10;
|
||||
public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
|
||||
private static final Logger logger = LogManager.getLogger(DataFrameIndexer.class);
|
||||
|
||||
protected final Auditor<DataFrameAuditMessage> auditor;
|
||||
|
||||
private Pivot pivot;
|
||||
private int pageSize = 0;
|
||||
|
||||
public DataFrameIndexer(Executor executor,
|
||||
Auditor<DataFrameAuditMessage> auditor,
|
||||
AtomicReference<IndexerState> initialState,
|
||||
Map<String, Object> initialPosition,
|
||||
DataFrameIndexerTransformStats jobStats) {
|
||||
super(executor, initialState, initialPosition, jobStats);
|
||||
this.auditor = Objects.requireNonNull(auditor);
|
||||
}
|
||||
|
||||
protected abstract DataFrameTransformConfig getConfig();
|
||||
|
||||
protected abstract Map<String, String> getFieldMappings();
|
||||
|
||||
protected abstract void failIndexer(String message);
|
||||
|
||||
public int getPageSize() {
|
||||
return pageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Request a checkpoint
|
||||
*/
|
||||
|
@ -62,6 +82,11 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
|
|||
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
|
||||
pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
|
||||
|
||||
// if we haven't set the page size yet, if it is set we might have reduced it after running into an out of memory
|
||||
if (pageSize == 0) {
|
||||
pageSize = pivot.getInitialPageSize();
|
||||
}
|
||||
|
||||
// if run for the 1st time, create checkpoint
|
||||
if (getPosition() == null) {
|
||||
createCheckpoint(listener);
|
||||
|
@ -73,6 +98,12 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFinish(ActionListener<Void> listener) {
|
||||
// reset the page size, so we do not memorize a low page size forever, the pagesize will be re-calculated on start
|
||||
pageSize = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) {
|
||||
final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
|
||||
|
@ -121,6 +152,70 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
|
|||
|
||||
@Override
|
||||
protected SearchRequest buildSearchRequest() {
|
||||
return pivot.buildSearchRequest(getPosition());
|
||||
return pivot.buildSearchRequest(getPosition(), pageSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the circuit breaking case: A search consumed to much memory and got aborted.
|
||||
*
|
||||
* Going out of memory we smoothly reduce the page size which reduces memory consumption.
|
||||
*
|
||||
* Implementation details: We take the values from the circuit breaker as a hint, but
|
||||
* note that it breaks early, that's why we also reduce using
|
||||
*
|
||||
* @param e Exception thrown, only {@link CircuitBreakingException} are handled
|
||||
* @return true if exception was handled, false if not
|
||||
*/
|
||||
protected boolean handleCircuitBreakingException(Exception e) {
|
||||
CircuitBreakingException circuitBreakingException = getCircuitBreakingException(e);
|
||||
|
||||
if (circuitBreakingException == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
double reducingFactor = Math.min((double) circuitBreakingException.getByteLimit() / circuitBreakingException.getBytesWanted(),
|
||||
1 - (Math.log10(pageSize) * 0.1));
|
||||
|
||||
int newPageSize = (int) Math.round(reducingFactor * pageSize);
|
||||
|
||||
if (newPageSize < MINIMUM_PAGE_SIZE) {
|
||||
String message = DataFrameMessages.getMessage(DataFrameMessages.LOG_DATA_FRAME_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE, pageSize);
|
||||
failIndexer(message);
|
||||
return true;
|
||||
}
|
||||
|
||||
String message = DataFrameMessages.getMessage(DataFrameMessages.LOG_DATA_FRAME_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE, pageSize,
|
||||
newPageSize);
|
||||
auditor.info(getJobId(), message);
|
||||
logger.info("Data frame transform [" + getJobId() + "]:" + message);
|
||||
|
||||
pageSize = newPageSize;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Inspect exception for circuit breaking exception and return the first one it can find.
|
||||
*
|
||||
* @param e Exception
|
||||
* @return CircuitBreakingException instance if found, null otherwise
|
||||
*/
|
||||
private static CircuitBreakingException getCircuitBreakingException(Exception e) {
|
||||
// circuit breaking exceptions are at the bottom
|
||||
Throwable unwrappedThrowable = ExceptionsHelper.unwrapCause(e);
|
||||
|
||||
if (unwrappedThrowable instanceof CircuitBreakingException) {
|
||||
return (CircuitBreakingException) unwrappedThrowable;
|
||||
} else if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
|
||||
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e;
|
||||
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
|
||||
Throwable unwrappedShardFailure = ExceptionsHelper.unwrapCause(shardFailure.getCause());
|
||||
|
||||
if (unwrappedShardFailure instanceof CircuitBreakingException) {
|
||||
return (CircuitBreakingException) unwrappedShardFailure;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,25 +277,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||
));
|
||||
}
|
||||
|
||||
private boolean isIrrecoverableFailure(Exception e) {
|
||||
return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException;
|
||||
}
|
||||
|
||||
synchronized void handleFailure(Exception e) {
|
||||
if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) {
|
||||
String failureMessage = isIrrecoverableFailure(e) ?
|
||||
"task encountered irrecoverable failure: " + e.getMessage() :
|
||||
"task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage();
|
||||
auditor.error(transform.getId(), failureMessage);
|
||||
stateReason.set(failureMessage);
|
||||
taskState.set(DataFrameTransformTaskState.FAILED);
|
||||
persistStateToClusterState(getState(), ActionListener.wrap(
|
||||
r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted
|
||||
exception -> {} // Noop, internal method logs the failure to update the state
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called when the persistent task signals that the allocated task should be terminated.
|
||||
* Termination in the task framework is essentially voluntary, as the allocated task can only be
|
||||
|
@ -313,13 +294,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||
|
||||
protected class ClientDataFrameIndexer extends DataFrameIndexer {
|
||||
private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30;
|
||||
private static final int CREATE_CHECKPOINT_TIMEOUT_IN_SECONDS = 30;
|
||||
|
||||
private final Client client;
|
||||
private final DataFrameTransformsConfigManager transformsConfigManager;
|
||||
private final DataFrameTransformsCheckpointService transformsCheckpointService;
|
||||
private final String transformId;
|
||||
private final Auditor<DataFrameAuditMessage> auditor;
|
||||
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
|
||||
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
|
||||
private volatile String lastAuditedExceptionMessage = null;
|
||||
|
@ -331,13 +310,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||
DataFrameTransformsCheckpointService transformsCheckpointService,
|
||||
AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition, Client client,
|
||||
Auditor<DataFrameAuditMessage> auditor) {
|
||||
super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition,
|
||||
super(threadPool.executor(ThreadPool.Names.GENERIC), auditor, initialState, initialPosition,
|
||||
new DataFrameIndexerTransformStats(transformId));
|
||||
this.transformId = transformId;
|
||||
this.transformsConfigManager = transformsConfigManager;
|
||||
this.transformsCheckpointService = transformsCheckpointService;
|
||||
this.client = client;
|
||||
this.auditor = auditor;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -474,19 +452,26 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||
|
||||
@Override
|
||||
protected void onFailure(Exception exc) {
|
||||
// the failure handler must not throw an exception due to internal problems
|
||||
try {
|
||||
logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc);
|
||||
|
||||
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
|
||||
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
|
||||
if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) {
|
||||
auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage());
|
||||
lastAuditedExceptionMessage = exc.getMessage();
|
||||
}
|
||||
logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc);
|
||||
handleFailure(exc);
|
||||
} catch (Exception e) {
|
||||
logger.error("Data frame transform encountered an unexpected internal exception: " ,e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFinish(ActionListener<Void> listener) {
|
||||
try {
|
||||
super.onFinish(listener);
|
||||
long checkpoint = currentCheckpoint.incrementAndGet();
|
||||
auditor.info(transform.getId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]");
|
||||
logger.info("Finished indexing for data frame transform [" + transform.getId() + "] checkpoint [" + checkpoint + "]");
|
||||
|
@ -515,6 +500,35 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
|||
listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException));
|
||||
}));
|
||||
}
|
||||
|
||||
private boolean isIrrecoverableFailure(Exception e) {
|
||||
return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException;
|
||||
}
|
||||
|
||||
synchronized void handleFailure(Exception e) {
|
||||
if (handleCircuitBreakingException(e)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) {
|
||||
String failureMessage = isIrrecoverableFailure(e) ?
|
||||
"task encountered irrecoverable failure: " + e.getMessage() :
|
||||
"task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage();
|
||||
failIndexer(failureMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void failIndexer(String failureMessage) {
|
||||
logger.error("Data frame transform [" + getJobId() + "]:" + failureMessage);
|
||||
auditor.error(transform.getId(), failureMessage);
|
||||
stateReason.set(failureMessage);
|
||||
taskState.set(DataFrameTransformTaskState.FAILED);
|
||||
persistStateToClusterState(DataFrameTransformTask.this.getState(), ActionListener.wrap(
|
||||
r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted
|
||||
exception -> {} // Noop, internal method logs the failure to update the state
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
class DataFrameConfigurationException extends RuntimeException {
|
||||
|
|
|
@ -35,6 +35,8 @@ import java.util.stream.Stream;
|
|||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
public class Pivot {
|
||||
public static final int DEFAULT_INITIAL_PAGE_SIZE = 500;
|
||||
|
||||
private static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
|
||||
|
||||
private final PivotConfig config;
|
||||
|
@ -68,11 +70,29 @@ public class Pivot {
|
|||
SchemaUtil.deduceMappings(client, config, source, listener);
|
||||
}
|
||||
|
||||
public SearchRequest buildSearchRequest(Map<String, Object> position) {
|
||||
/**
|
||||
* Get the initial page size for this pivot.
|
||||
*
|
||||
* The page size is the main parameter for adjusting memory consumption. Memory consumption mainly depends on
|
||||
* the page size, the type of aggregations and the data. As the page size is the number of buckets we return
|
||||
* per page the page size is a multiplier for the costs of aggregating bucket.
|
||||
*
|
||||
* Initially this returns a default, in future it might inspect the configuration and base the initial size
|
||||
* on the aggregations used.
|
||||
*
|
||||
* @return the page size
|
||||
*/
|
||||
public int getInitialPageSize() {
|
||||
return DEFAULT_INITIAL_PAGE_SIZE;
|
||||
}
|
||||
|
||||
public SearchRequest buildSearchRequest(Map<String, Object> position, int pageSize) {
|
||||
if (position != null) {
|
||||
cachedCompositeAggregation.aggregateAfter(position);
|
||||
}
|
||||
|
||||
cachedCompositeAggregation.size(pageSize);
|
||||
|
||||
return cachedSearchRequest;
|
||||
}
|
||||
|
||||
|
@ -127,7 +147,6 @@ public class Pivot {
|
|||
XContentParser parser = builder.generator().contentType().xContent().createParser(NamedXContentRegistry.EMPTY,
|
||||
LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput());
|
||||
compositeAggregation = CompositeAggregationBuilder.parse(COMPOSITE_AGGREGATION_NAME, parser);
|
||||
compositeAggregation.size(1000);
|
||||
config.getAggregationConfig().getAggregatorFactories().forEach(agg -> compositeAggregation.subAggregation(agg));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(DataFrameMessages.DATA_FRAME_TRANSFORM_PIVOT_FAILED_TO_CREATE_COMPOSITE_AGGREGATION, e);
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.dataframe.transforms;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker.Durability;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.common.notifications.Auditor;
|
||||
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
|
||||
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
|
||||
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class DataFrameIndexerTests extends ESTestCase {
|
||||
|
||||
private Client client;
|
||||
private static final String TEST_ORIGIN = "test_origin";
|
||||
private static final String TEST_INDEX = "test_index";
|
||||
|
||||
class MockedDataFrameIndexer extends DataFrameIndexer {
|
||||
|
||||
private final DataFrameTransformConfig transformConfig;
|
||||
private final Map<String, String> fieldMappings;
|
||||
private final Function<SearchRequest, SearchResponse> searchFunction;
|
||||
private final Function<BulkRequest, BulkResponse> bulkFunction;
|
||||
private final Consumer<Exception> failureConsumer;
|
||||
|
||||
// used for synchronizing with the test
|
||||
private CountDownLatch latch;
|
||||
|
||||
MockedDataFrameIndexer(
|
||||
Executor executor,
|
||||
DataFrameTransformConfig transformConfig,
|
||||
Map<String, String> fieldMappings,
|
||||
Auditor<DataFrameAuditMessage> auditor,
|
||||
AtomicReference<IndexerState> initialState,
|
||||
Map<String, Object> initialPosition,
|
||||
DataFrameIndexerTransformStats jobStats,
|
||||
Function<SearchRequest, SearchResponse> searchFunction,
|
||||
Function<BulkRequest, BulkResponse> bulkFunction,
|
||||
Consumer<Exception> failureConsumer) {
|
||||
super(executor, auditor, initialState, initialPosition, jobStats);
|
||||
this.transformConfig = Objects.requireNonNull(transformConfig);
|
||||
this.fieldMappings = Objects.requireNonNull(fieldMappings);
|
||||
this.searchFunction = searchFunction;
|
||||
this.bulkFunction = bulkFunction;
|
||||
this.failureConsumer = failureConsumer;
|
||||
}
|
||||
|
||||
public CountDownLatch newLatch(int count) {
|
||||
return latch = new CountDownLatch(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DataFrameTransformConfig getConfig() {
|
||||
return transformConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> getFieldMappings() {
|
||||
return fieldMappings;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createCheckpoint(ActionListener<Void> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getJobId() {
|
||||
return transformConfig.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
|
||||
assert latch != null;
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
SearchResponse response = searchFunction.apply(request);
|
||||
nextPhase.onResponse(response);
|
||||
} catch (Exception e) {
|
||||
nextPhase.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
|
||||
assert latch != null;
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
||||
try {
|
||||
BulkResponse response = bulkFunction.apply(request);
|
||||
nextPhase.onResponse(response);
|
||||
} catch (Exception e) {
|
||||
nextPhase.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
|
||||
assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED;
|
||||
next.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFailure(Exception exc) {
|
||||
try {
|
||||
// mimic same behavior as {@link DataFrameTransformTask}
|
||||
if (handleCircuitBreakingException(exc)) {
|
||||
return;
|
||||
}
|
||||
|
||||
failureConsumer.accept(exc);
|
||||
} catch (Exception e) {
|
||||
fail("Internal error: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onFinish(ActionListener<Void> listener) {
|
||||
super.onFinish(listener);
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onAbort() {
|
||||
fail("onAbort should not be called");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void failIndexer(String message) {
|
||||
fail("failIndexer should not be called, received error: " + message);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpMocks() {
|
||||
client = mock(Client.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(client.threadPool()).thenReturn(threadPool);
|
||||
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
|
||||
}
|
||||
|
||||
public void testPageSizeAdapt() throws InterruptedException {
|
||||
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig();
|
||||
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
||||
|
||||
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
|
||||
throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] {
|
||||
new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) });
|
||||
};
|
||||
|
||||
Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
|
||||
|
||||
Consumer<Exception> failureConsumer = e -> {
|
||||
fail("expected circuit breaker exception to be handled");
|
||||
};
|
||||
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
try {
|
||||
Auditor<DataFrameAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN,
|
||||
DataFrameAuditMessage.builder());
|
||||
|
||||
MockedDataFrameIndexer indexer = new MockedDataFrameIndexer(executor, config, Collections.emptyMap(), auditor, state, null,
|
||||
new DataFrameIndexerTransformStats(config.getId()), searchFunction, bulkFunction, failureConsumer);
|
||||
final CountDownLatch latch = indexer.newLatch(1);
|
||||
indexer.start();
|
||||
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
||||
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
|
||||
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
|
||||
latch.countDown();
|
||||
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
|
||||
long pageSizeAfterFirstReduction = indexer.getPageSize();
|
||||
assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction);
|
||||
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
|
||||
|
||||
// run indexer a 2nd time
|
||||
final CountDownLatch secondRunLatch = indexer.newLatch(1);
|
||||
indexer.start();
|
||||
assertEquals(pageSizeAfterFirstReduction, indexer.getPageSize());
|
||||
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
|
||||
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
|
||||
assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
|
||||
secondRunLatch.countDown();
|
||||
awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
|
||||
|
||||
// assert that page size has been reduced again
|
||||
assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize());
|
||||
assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
|
||||
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue