refactor onStart and onFinish to take runnables and executed them guarded by state (#40855)

refactor onStart and onFinish to take action listeners and execute them when indexer is in indexing state.
This commit is contained in:
Hendrik Muhs 2019-04-07 21:43:35 +02:00
parent 4163e59768
commit d5fcbf2f4a
8 changed files with 131 additions and 68 deletions

View File

@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
* Only one background job can run simultaneously and {@link #onFinish()} is called when the job
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
* finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
* aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
* {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
@ -85,13 +85,10 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
/**
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background and in such case {@link #onFinish()} will be called
* as soon as the background job detects that the indexer is stopped. If there
* is no job running when this function is called, the state is directly set to
* {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called.
* running in the background. If there is no job running when this function is
* called, the state is directly set to {@link IndexerState#STOPPED}.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the
* job was already aborted).
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
IndexerState currentState = state.updateAndGet(previousState -> {
@ -148,17 +145,17 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
case STARTED:
logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]");
stats.incrementNumInvocations(1);
onStartJob(now);
if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
try {
onStart(now, ActionListener.wrap(r -> {
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} catch (Exception e) {
finishWithSearchFailure(e);
}
}, e -> {
finishAndSetState();
onFailure(e);
}));
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
@ -200,8 +197,9 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
* internal state is {@link IndexerState#STARTED}.
*
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)}
* @param listener listener to call after done
*/
protected abstract void onStartJob(long now);
protected abstract void onStart(long now, ActionListener<Void> listener);
/**
* Executes the {@link SearchRequest} and calls <code>nextPhase</code> with the
@ -248,9 +246,12 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
protected abstract void onFailure(Exception exc);
/**
* Called when a background job finishes.
* Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to
* {@link IndexerState#STARTED}.
*
* @param listener listener to call after done
*/
protected abstract void onFinish();
protected abstract void onFinish(ActionListener<Void> listener);
/**
* Called when a background job detects that the indexer is aborted causing the
@ -315,10 +316,11 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
if (iterationResult.isDone()) {
logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
// Change state first, then try to persist. This prevents in-progress
// STOPPING/ABORTING from
// being persisted as STARTED but then stop the job
doSaveState(finishAndSetState(), position.get(), this::onFinish);
// execute finishing tasks
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
e -> doSaveState(finishAndSetState(), position.get(), () -> {})));
return;
}
@ -337,6 +339,8 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage());
}
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
// check if indexer has been asked to stop, state {@link IndexerState#STOPPING}
if (checkState(getState()) == false) {
return;
}

View File

@ -76,9 +76,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
@Override
protected void onStartJob(long now) {
protected void onStart(long now, ActionListener<Void> listener) {
assertThat(step, equalTo(0));
++step;
listener.onResponse(null);
}
@Override
@ -98,7 +99,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
assertThat(step, equalTo(4));
assertThat(step, equalTo(5));
++step;
next.run();
}
@ -109,10 +110,11 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
@Override
protected void onFinish() {
assertThat(step, equalTo(5));
protected void onFinish(ActionListener<Void> listener) {
assertThat(step, equalTo(4));
++step;
isFinished.set(true);
listener.onResponse(null);
}
@Override
@ -153,9 +155,10 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
}
@Override
protected void onStartJob(long now) {
protected void onStart(long now, ActionListener<Void> listener) {
assertThat(step, equalTo(0));
++step;
listener.onResponse(null);
}
@Override
@ -170,20 +173,18 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
@Override
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
assertThat(step, equalTo(2));
++step;
next.run();
fail("should not be called");
}
@Override
protected void onFailure(Exception exc) {
assertThat(step, equalTo(3));
assertThat(step, equalTo(2));
++step;
isFinished.set(true);
}
@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
fail("should not be called");
}
@ -240,8 +241,8 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
indexer.start();
assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get()));
assertThat(indexer.getStep(), equalTo(4));
assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
assertThat(indexer.getStep(), equalTo(3));
} finally {
executor.shutdownNow();

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -51,10 +52,14 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
protected abstract Map<String, String> getFieldMappings();
@Override
protected void onStartJob(long now) {
protected void onStart(long now, ActionListener<Void> listener) {
try {
QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery();
pivot = new Pivot(getConfig().getSource().getIndex(), queryBuilder, getConfig().getPivotConfig());
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override

View File

@ -27,12 +27,12 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
@ -478,9 +478,14 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
try {
auditor.info(transform.getId(), "Finished indexing for data frame transform");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.rollup.job;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
@ -75,7 +76,21 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
*/
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
AtomicBoolean upgradedDocumentID) {
super(executor, initialState, initialPosition, new RollupIndexerJobStats());
this(executor, job, initialState, initialPosition, upgradedDocumentID, new RollupIndexerJobStats());
}
/**
* Ctr
* @param executor Executor to use to fire the first request of a background job.
* @param job The rollup job
* @param initialState Initial state for the indexer
* @param initialPosition The last indexed bucket of the task
* @param upgradedDocumentID whether job has updated IDs (for BWC)
* @param jobStats jobstats instance for collecting stats
*/
RollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition,
AtomicBoolean upgradedDocumentID, RollupIndexerJobStats jobStats) {
super(executor, initialState, initialPosition, jobStats);
this.job = job;
this.compositeBuilder = createCompositeBuilder(job.getConfig());
this.upgradedDocumentID = upgradedDocumentID;
@ -94,7 +109,8 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
}
@Override
protected void onStartJob(long now) {
protected void onStart(long now, ActionListener<Void> listener) {
try {
// this is needed to exclude buckets that can still receive new documents.
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
long rounded = dateHisto.createRounding().round(now);
@ -104,6 +120,10 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
} else {
maxBoundary = rounded;
}
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override

View File

@ -138,8 +138,9 @@ public class RollupJobTask extends AllocatedPersistentTask implements SchedulerE
}
@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
logger.debug("Finished indexing for job [" + job.getConfig().getId() + "]");
listener.onResponse(null);
}
@Override

View File

@ -580,8 +580,9 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
}
@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
latch.countDown();
listener.onResponse(null);
}
@Override

View File

@ -24,7 +24,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.mockito.stubbing.Answer;
@ -44,11 +44,18 @@ import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
public class RollupIndexerStateTests extends ESTestCase {
private static class EmptyRollupIndexer extends RollupIndexer {
EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, boolean upgraded, RollupIndexerJobStats stats) {
super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded), stats);
}
EmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, boolean upgraded) {
super(executor, job, initialState, initialPosition, new AtomicBoolean(upgraded));
@ -124,7 +131,9 @@ public class RollupIndexerStateTests extends ESTestCase {
}
@Override
protected void onFinish() {}
protected void onFinish(ActionListener<Void> listener) {
listener.onResponse(null);
}
}
private static class DelayedEmptyRollupIndexer extends EmptyRollupIndexer {
@ -140,6 +149,11 @@ public class RollupIndexerStateTests extends ESTestCase {
super(executor, job, initialState, initialPosition, randomBoolean());
}
DelayedEmptyRollupIndexer(Executor executor, RollupJob job, AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition, RollupIndexerJobStats stats) {
super(executor, job, initialState, initialPosition, randomBoolean(), stats);
}
private CountDownLatch newLatch() {
return latch = new CountDownLatch(1);
}
@ -214,7 +228,9 @@ public class RollupIndexerStateTests extends ESTestCase {
}
@Override
protected void onFinish() {}
protected void onFinish(ActionListener<Void> listener) {
listener.onResponse(null);
}
}
public void testStarted() throws Exception {
@ -248,9 +264,11 @@ public class RollupIndexerStateTests extends ESTestCase {
AtomicBoolean isFinished = new AtomicBoolean(false);
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
@Override
protected void onFinish() {
super.onFinish();
protected void onFinish(ActionListener<Void> listener) {
super.onFinish(ActionListener.wrap(r -> {
isFinished.set(true);
listener.onResponse(r);
}, listener::onFailure));
}
};
final CountDownLatch latch = indexer.newLatch();
@ -274,24 +292,32 @@ public class RollupIndexerStateTests extends ESTestCase {
public void testStateChangeMidTrigger() throws Exception {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
RollupIndexerJobStats stats = new RollupIndexerJobStats();
RollupIndexerJobStats spyStats = spy(stats);
RollupJobConfig config = mock(RollupJobConfig.class);
// We pull the config before a final state check, so this allows us to flip the state
// We call stats before a final state check, so this allows us to flip the state
// and make sure the appropriate error is thrown
when(config.getGroupConfig()).then((Answer<GroupConfig>) invocationOnMock -> {
Answer<?> forwardAndChangeState = invocation -> {
invocation.callRealMethod();
state.set(IndexerState.STOPPED);
return ConfigTestHelpers.randomGroupConfig(random());
});
return null;
};
doAnswer(forwardAndChangeState).when(spyStats).incrementNumInvocations(1L);
RollupJob job = new RollupJob(config, Collections.emptyMap());
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
AtomicBoolean isFinished = new AtomicBoolean(false);
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null) {
DelayedEmptyRollupIndexer indexer = new DelayedEmptyRollupIndexer(executor, job, state, null, spyStats) {
@Override
protected void onFinish() {
super.onFinish();
protected void onFinish(ActionListener<Void> listener) {
super.onFinish(ActionListener.wrap(r -> {
isFinished.set(true);
listener.onResponse(r);
}, listener::onFailure));
}
};
final CountDownLatch latch = indexer.newLatch();
@ -318,7 +344,7 @@ public class RollupIndexerStateTests extends ESTestCase {
try {
EmptyRollupIndexer indexer = new EmptyRollupIndexer(executor, job, state, null) {
@Override
protected void onFinish() {
protected void onFinish(ActionListener<Void> listener) {
fail("Should not have called onFinish");
}