Fix RollupDocumentation test to wait for job to stop

Also adds some extra state debug information to various log messages
This commit is contained in:
Zachary Tong 2019-01-11 13:16:05 -05:00
parent 827ece73c8
commit de52ba1f78
3 changed files with 19 additions and 5 deletions
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation
x-pack/plugin
core/src/main/java/org/elasticsearch/xpack/core/indexing
rollup/src/main/java/org/elasticsearch/xpack/rollup/action

@ -261,6 +261,14 @@ public class RollupDocumentationIT extends ESRestHighLevelClientTestCase {
} catch (Exception e) { } catch (Exception e) {
// Swallow any exception, this test does not test actually cancelling. // Swallow any exception, this test does not test actually cancelling.
} }
// stop job to prevent spamming exceptions on next start request
StopRollupJobRequest stopRequest = new StopRollupJobRequest(id);
stopRequest.waitForCompletion();
stopRequest.timeout(TimeValue.timeValueSeconds(10));
StopRollupJobResponse response = client.rollup().stopRollupJob(stopRequest, RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());
// tag::rollup-start-job-execute-listener // tag::rollup-start-job-execute-listener
ActionListener<StartRollupJobResponse> listener = new ActionListener<StartRollupJobResponse>() { ActionListener<StartRollupJobResponse> listener = new ActionListener<StartRollupJobResponse>() {
@Override @Override
@ -282,7 +290,8 @@ public class RollupDocumentationIT extends ESRestHighLevelClientTestCase {
assertTrue(latch.await(30L, TimeUnit.SECONDS)); assertTrue(latch.await(30L, TimeUnit.SECONDS));
// stop job so it can correctly be deleted by the test teardown // stop job so it can correctly be deleted by the test teardown
rc.stopRollupJob(new StopRollupJobRequest(id), RequestOptions.DEFAULT); response = rc.stopRollupJob(stopRequest, RequestOptions.DEFAULT);
assertTrue(response.isAcknowledged());
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")

@ -137,7 +137,8 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
case INDEXING: case INDEXING:
case STOPPING: case STOPPING:
case ABORTING: case ABORTING:
logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running."); logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running " +
"(with state [" + currentState + "]");
return false; return false;
case STOPPED: case STOPPED:
@ -381,8 +382,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
case STOPPING: case STOPPING:
logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer.");
doSaveState(finishAndSetState(), getPosition(), () -> { doSaveState(finishAndSetState(), getPosition(), () -> {});
});
return false; return false;
case STOPPED: case STOPPED:

@ -81,10 +81,15 @@ public class TransportStopRollupAction extends TransportTasksAction<RollupJobTas
listener.onResponse(response); listener.onResponse(response);
} else { } else {
listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep() listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep()
+ "] while waiting for rollup job [" + request.getId() + "] to stop")); + "] while waiting for rollup job [" + request.getId() + "] to stop. State was ["
+ ((RollupJobStatus) jobTask.getStatus()).getIndexerState() + "]"));
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
listener.onFailure(e); listener.onFailure(e);
} catch (Exception e) {
listener.onFailure(new ElasticsearchTimeoutException("Encountered unexpected error while waiting for " +
"rollup job [" + request.getId() + "] to stop. State was ["
+ ((RollupJobStatus) jobTask.getStatus()).getIndexerState() + "].", e));
} }
}); });