From 02247cc7df46da7e233410f613c30607d1dd7be3 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 16 Apr 2019 18:53:36 +0200 Subject: [PATCH] [ML-DataFrame] adapt page size on circuit breaker responses (#41149) handle circuit breaker response and adapt page size to reduce memory pressure, reduce preview buckets to 100, initial page size to 500 --- .../core/dataframe/DataFrameMessages.java | 5 + .../integration/DataFramePivotRestIT.java | 3 +- ...nsportPreviewDataFrameTransformAction.java | 3 +- .../transforms/DataFrameIndexer.java | 97 ++++++- .../transforms/DataFrameTransformTask.java | 74 +++--- .../dataframe/transforms/pivot/Pivot.java | 23 +- .../transforms/DataFrameIndexerTests.java | 239 ++++++++++++++++++ 7 files changed, 409 insertions(+), 35 deletions(-) create mode 100644 x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index dbe789ca3ae..86dce1b3314 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -56,6 +56,11 @@ public class DataFrameMessages { "Failed to parse group_by for data frame pivot transform"; public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION = "Failed to parse aggregation for data frame pivot transform"; + public static final String LOG_DATA_FRAME_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE = + "Search returned with out of memory error, reducing number of buckets per search from [{0}] to [{1}]"; + public static final String LOG_DATA_FRAME_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE = + "Search returned with out of memory error after repeated page size reductions to [{0}], unable to continue pivot, " + + "please simplify job or increase heap size on data nodes."; public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = "Failed to parse transform checkpoints for [{0}]"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 0d14851aa7c..6ff97e1ed9d 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -260,7 +260,8 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { createPreviewRequest.setJsonEntity(config); Map previewDataframeResponse = entityAsMap(client().performRequest(createPreviewRequest)); List> preview = (List>)previewDataframeResponse.get("preview"); - assertThat(preview.size(), equalTo(393)); + // preview is limited to 100 + assertThat(preview.size(), equalTo(100)); Set expectedFields = new HashSet<>(Arrays.asList("reviewer", "by_day", "avg_rating")); preview.forEach(p -> { Set keys = p.keySet(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java index 2a4ba47f507..b65830f72e7 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java @@ -35,6 +35,7 @@ import static org.elasticsearch.xpack.dataframe.transforms.DataFrameIndexer.COMP public class TransportPreviewDataFrameTransformAction extends HandledTransportAction { + private static final int NUMBER_OF_PREVIEW_BUCKETS = 100; private final XPackLicenseState licenseState; private final Client client; private final ThreadPool threadPool; @@ -77,7 +78,7 @@ public class TransportPreviewDataFrameTransformAction extends ClientHelper.DATA_FRAME_ORIGIN, client, SearchAction.INSTANCE, - pivot.buildSearchRequest(null), + pivot.buildSearchRequest(null, NUMBER_OF_PREVIEW_BUCKETS), ActionListener.wrap( r -> { final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index c781d05f189..c670f32740c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -8,14 +8,21 @@ package org.elasticsearch.xpack.dataframe.transforms; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; @@ -26,6 +33,7 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -35,22 +43,34 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer, DataFrameIndexerTransformStats> { + public static final int MINIMUM_PAGE_SIZE = 10; public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame"; private static final Logger logger = LogManager.getLogger(DataFrameIndexer.class); + protected final Auditor auditor; + private Pivot pivot; + private int pageSize = 0; public DataFrameIndexer(Executor executor, + Auditor auditor, AtomicReference initialState, Map initialPosition, DataFrameIndexerTransformStats jobStats) { super(executor, initialState, initialPosition, jobStats); + this.auditor = Objects.requireNonNull(auditor); } protected abstract DataFrameTransformConfig getConfig(); protected abstract Map getFieldMappings(); + protected abstract void failIndexer(String message); + + public int getPageSize() { + return pageSize; + } + /** * Request a checkpoint */ @@ -62,6 +82,11 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer listener) { + // reset the page size, so we do not memorize a low page size forever, the pagesize will be re-calculated on start + pageSize = 0; + } + @Override protected IterationResult> doProcess(SearchResponse searchResponse) { final CompositeAggregation agg = searchResponse.getAggregations().get(COMPOSITE_AGGREGATION_NAME); @@ -121,6 +152,70 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer MAX_CONTINUOUS_FAILURES) { - String failureMessage = isIrrecoverableFailure(e) ? - "task encountered irrecoverable failure: " + e.getMessage() : - "task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage(); - auditor.error(transform.getId(), failureMessage); - stateReason.set(failureMessage); - taskState.set(DataFrameTransformTaskState.FAILED); - persistStateToClusterState(getState(), ActionListener.wrap( - r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted - exception -> {} // Noop, internal method logs the failure to update the state - )); - } - } - /** * This is called when the persistent task signals that the allocated task should be terminated. * Termination in the task framework is essentially voluntary, as the allocated task can only be @@ -313,13 +294,11 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S protected class ClientDataFrameIndexer extends DataFrameIndexer { private static final int LOAD_TRANSFORM_TIMEOUT_IN_SECONDS = 30; - private static final int CREATE_CHECKPOINT_TIMEOUT_IN_SECONDS = 30; private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsCheckpointService transformsCheckpointService; private final String transformId; - private final Auditor auditor; private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null; // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; @@ -331,13 +310,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S DataFrameTransformsCheckpointService transformsCheckpointService, AtomicReference initialState, Map initialPosition, Client client, Auditor auditor) { - super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition, + super(threadPool.executor(ThreadPool.Names.GENERIC), auditor, initialState, initialPosition, new DataFrameIndexerTransformStats(transformId)); this.transformId = transformId; this.transformsConfigManager = transformsConfigManager; this.transformsCheckpointService = transformsCheckpointService; this.client = client; - this.auditor = auditor; } @Override @@ -474,19 +452,26 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S @Override protected void onFailure(Exception exc) { - // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous - // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one - if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) { - auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage()); - lastAuditedExceptionMessage = exc.getMessage(); + // the failure handler must not throw an exception due to internal problems + try { + logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc); + + // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous + // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one + if (exc.getMessage().equals(lastAuditedExceptionMessage) == false) { + auditor.warning(transform.getId(), "Data frame transform encountered an exception: " + exc.getMessage()); + lastAuditedExceptionMessage = exc.getMessage(); + } + handleFailure(exc); + } catch (Exception e) { + logger.error("Data frame transform encountered an unexpected internal exception: " ,e); } - logger.warn("Data frame transform [" + transform.getId() + "] encountered an exception: ", exc); - handleFailure(exc); } @Override protected void onFinish(ActionListener listener) { try { + super.onFinish(listener); long checkpoint = currentCheckpoint.incrementAndGet(); auditor.info(transform.getId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "]"); logger.info("Finished indexing for data frame transform [" + transform.getId() + "] checkpoint [" + checkpoint + "]"); @@ -515,6 +500,35 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S listener.onFailure(new RuntimeException("Failed to retrieve checkpoint", getCheckPointException)); })); } + + private boolean isIrrecoverableFailure(Exception e) { + return e instanceof IndexNotFoundException || e instanceof DataFrameConfigurationException; + } + + synchronized void handleFailure(Exception e) { + if (handleCircuitBreakingException(e)) { + return; + } + + if (isIrrecoverableFailure(e) || failureCount.incrementAndGet() > MAX_CONTINUOUS_FAILURES) { + String failureMessage = isIrrecoverableFailure(e) ? + "task encountered irrecoverable failure: " + e.getMessage() : + "task encountered more than " + MAX_CONTINUOUS_FAILURES + " failures; latest failure: " + e.getMessage(); + failIndexer(failureMessage); + } + } + + @Override + protected void failIndexer(String failureMessage) { + logger.error("Data frame transform [" + getJobId() + "]:" + failureMessage); + auditor.error(transform.getId(), failureMessage); + stateReason.set(failureMessage); + taskState.set(DataFrameTransformTaskState.FAILED); + persistStateToClusterState(DataFrameTransformTask.this.getState(), ActionListener.wrap( + r -> failureCount.set(0), // Successfully marked as failed, reset counter so that task can be restarted + exception -> {} // Noop, internal method logs the failure to update the state + )); + } } class DataFrameConfigurationException extends RuntimeException { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 0cf3edec162..aa63ea92e7a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -35,6 +35,8 @@ import java.util.stream.Stream; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public class Pivot { + public static final int DEFAULT_INITIAL_PAGE_SIZE = 500; + private static final String COMPOSITE_AGGREGATION_NAME = "_data_frame"; private final PivotConfig config; @@ -68,11 +70,29 @@ public class Pivot { SchemaUtil.deduceMappings(client, config, source, listener); } - public SearchRequest buildSearchRequest(Map position) { + /** + * Get the initial page size for this pivot. + * + * The page size is the main parameter for adjusting memory consumption. Memory consumption mainly depends on + * the page size, the type of aggregations and the data. As the page size is the number of buckets we return + * per page the page size is a multiplier for the costs of aggregating bucket. + * + * Initially this returns a default, in future it might inspect the configuration and base the initial size + * on the aggregations used. + * + * @return the page size + */ + public int getInitialPageSize() { + return DEFAULT_INITIAL_PAGE_SIZE; + } + + public SearchRequest buildSearchRequest(Map position, int pageSize) { if (position != null) { cachedCompositeAggregation.aggregateAfter(position); } + cachedCompositeAggregation.size(pageSize); + return cachedSearchRequest; } @@ -127,7 +147,6 @@ public class Pivot { XContentParser parser = builder.generator().contentType().xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput()); compositeAggregation = CompositeAggregationBuilder.parse(COMPOSITE_AGGREGATION_NAME, parser); - compositeAggregation.size(1000); config.getAggregationConfig().getAggregatorFactories().forEach(agg -> compositeAggregation.subAggregation(agg)); } catch (IOException e) { throw new RuntimeException(DataFrameMessages.DATA_FRAME_TRANSFORM_PIVOT_FAILED_TO_CREATE_COMPOSITE_AGGREGATION, e); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java new file mode 100644 index 00000000000..b121e8091c1 --- /dev/null +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.transforms; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.breaker.CircuitBreaker.Durability; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.common.notifications.Auditor; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; +import org.junit.Before; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DataFrameIndexerTests extends ESTestCase { + + private Client client; + private static final String TEST_ORIGIN = "test_origin"; + private static final String TEST_INDEX = "test_index"; + + class MockedDataFrameIndexer extends DataFrameIndexer { + + private final DataFrameTransformConfig transformConfig; + private final Map fieldMappings; + private final Function searchFunction; + private final Function bulkFunction; + private final Consumer failureConsumer; + + // used for synchronizing with the test + private CountDownLatch latch; + + MockedDataFrameIndexer( + Executor executor, + DataFrameTransformConfig transformConfig, + Map fieldMappings, + Auditor auditor, + AtomicReference initialState, + Map initialPosition, + DataFrameIndexerTransformStats jobStats, + Function searchFunction, + Function bulkFunction, + Consumer failureConsumer) { + super(executor, auditor, initialState, initialPosition, jobStats); + this.transformConfig = Objects.requireNonNull(transformConfig); + this.fieldMappings = Objects.requireNonNull(fieldMappings); + this.searchFunction = searchFunction; + this.bulkFunction = bulkFunction; + this.failureConsumer = failureConsumer; + } + + public CountDownLatch newLatch(int count) { + return latch = new CountDownLatch(count); + } + + @Override + protected DataFrameTransformConfig getConfig() { + return transformConfig; + } + + @Override + protected Map getFieldMappings() { + return fieldMappings; + } + + @Override + protected void createCheckpoint(ActionListener listener) { + listener.onResponse(null); + } + + @Override + protected String getJobId() { + return transformConfig.getId(); + } + + @Override + protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + assert latch != null; + try { + latch.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + + try { + SearchResponse response = searchFunction.apply(request); + nextPhase.onResponse(response); + } catch (Exception e) { + nextPhase.onFailure(e); + } + } + + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + assert latch != null; + try { + latch.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + + try { + BulkResponse response = bulkFunction.apply(request); + nextPhase.onResponse(response); + } catch (Exception e) { + nextPhase.onFailure(e); + } + } + + @Override + protected void doSaveState(IndexerState state, Map position, Runnable next) { + assert state == IndexerState.STARTED || state == IndexerState.INDEXING || state == IndexerState.STOPPED; + next.run(); + } + + @Override + protected void onFailure(Exception exc) { + try { + // mimic same behavior as {@link DataFrameTransformTask} + if (handleCircuitBreakingException(exc)) { + return; + } + + failureConsumer.accept(exc); + } catch (Exception e) { + fail("Internal error: " + e.getMessage()); + } + } + + @Override + protected void onFinish(ActionListener listener) { + super.onFinish(listener); + listener.onResponse(null); + } + + @Override + protected void onAbort() { + fail("onAbort should not be called"); + } + + @Override + protected void failIndexer(String message) { + fail("failIndexer should not be called, received error: " + message); + } + + } + + @Before + public void setUpMocks() { + client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + } + + public void testPageSizeAdapt() throws InterruptedException { + DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig(); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + + Function searchFunction = searchRequest -> { + throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] { + new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) }); + }; + + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + Consumer failureConsumer = e -> { + fail("expected circuit breaker exception to be handled"); + }; + + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + Auditor auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, + DataFrameAuditMessage.builder()); + + MockedDataFrameIndexer indexer = new MockedDataFrameIndexer(executor, config, Collections.emptyMap(), auditor, state, null, + new DataFrameIndexerTransformStats(config.getId()), searchFunction, bulkFunction, failureConsumer); + final CountDownLatch latch = indexer.newLatch(1); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + latch.countDown(); + awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); + long pageSizeAfterFirstReduction = indexer.getPageSize(); + assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction); + assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE); + + // run indexer a 2nd time + final CountDownLatch secondRunLatch = indexer.newLatch(1); + indexer.start(); + assertEquals(pageSizeAfterFirstReduction, indexer.getPageSize()); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + secondRunLatch.countDown(); + awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); + + // assert that page size has been reduced again + assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize()); + assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE); + + } finally { + executor.shutdownNow(); + } + } +}