From 433469d4d05a8cb222ff0602dad6fd1664b55b94 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 8 Nov 2018 16:28:48 +0100 Subject: [PATCH] [Rollup] improve handling of failures on first search (#35269) Improve error handling in the Indexer if an exception occurs during the very 1st retrieval (query execution) --- .../core/indexing/AsyncTwoPhaseIndexer.java | 9 +- .../indexing/AsyncTwoPhaseIndexerTests.java | 92 ++++++++++++++++++- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ee0c0de97e0..c121b263ad6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -151,8 +151,13 @@ public abstract class AsyncTwoPhaseIndexer doNextSearch(buildSearchRequest(), - ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)))); + executor.execute(() -> { + try { + doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc))); + } catch (Exception e) { + finishWithFailure(e); + } + }); logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); return true; } else { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 2662e05570c..46d09e30eae 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -110,6 +110,78 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { } + private class MockIndexerThrowsFirstSearch extends AsyncTwoPhaseIndexer { + + // test the execution order + private int step; + + protected MockIndexerThrowsFirstSearch(Executor executor, AtomicReference initialState, Integer initialPosition) { + super(executor, initialState, initialPosition, new MockJobStats()); + } + + @Override + protected String getJobId() { + return "mock"; + } + + @Override + protected IterationResult doProcess(SearchResponse searchResponse) { + fail("should not be called"); + return null; + } + + @Override + protected SearchRequest buildSearchRequest() { + assertThat(step, equalTo(1)); + ++step; + return null; + } + + @Override + protected void onStartJob(long now) { + assertThat(step, equalTo(0)); + ++step; + } + + @Override + protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + throw new RuntimeException("Failed to build search request"); + } + + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + fail("should not be called"); + } + + @Override + protected void doSaveState(IndexerState state, Integer position, Runnable next) { + assertThat(step, equalTo(2)); + ++step; + next.run(); + } + + @Override + protected void onFailure(Exception exc) { + assertThat(step, equalTo(3)); + ++step; + isFinished.set(true); + } + + @Override + protected void onFinish() { + fail("should not be called"); + } + + @Override + protected void onAbort() { + fail("should not be called"); + } + + public int getStep() { + return step; + } + } + private static class MockJobStats extends IndexerJobStats { @Override @@ -121,7 +193,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { public void testStateMachine() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); - + isFinished.set(false); try { MockIndexer indexer = new MockIndexer(executor, state, 2); @@ -140,4 +212,22 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { executor.shutdownNow(); } } + + public void testStateMachineBrokenSearch() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + isFinished.set(false); + try { + + MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertTrue(ESTestCase.awaitBusy(() -> isFinished.get())); + assertThat(indexer.getStep(), equalTo(4)); + + } finally { + executor.shutdownNow(); + } + } }