From 7a4860452af3b496dd4d8ab3b005398f21574fea Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 12 Jan 2018 12:59:27 +0100 Subject: [PATCH] Update the persistent task status in the background. Follow up for #3256 --- .../ccr/action/ShardFollowTasksExecutor.java | 23 ++++++++++++------- .../ccr/action/ChunksCoordinatorTests.java | 1 - 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index e27b022d745..62df131c165 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -110,8 +110,15 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor prepare(task, params, leaderGlobalCheckPoint), task::markAsFailed) - ); + persistentTask -> logger.debug("[{}] Successfully updated global checkpoint to {}", + leaderShard, leaderGlobalCheckPoint), + updateStatusException -> { + logger.error(new ParameterizedMessage("[{}] Failed to update global checkpoint to {}", + leaderShard, leaderGlobalCheckPoint), updateStatusException); + task.markAsFailed(updateStatusException); + } + )); + prepare(task, params, leaderGlobalCheckPoint); } else { task.markAsFailed(e); } @@ -171,7 +178,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor new ParameterizedMessage("[{}] Failure starting processor", leaderShard), e); + LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e); postProcessChuck(e); } @@ -204,13 +211,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { if (e == null) { - LOGGER.debug("[{}] Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); + LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); processNextChunk(); } else { - LOGGER.error(() -> new ParameterizedMessage("[{}] Failure processing chunk [{}/{}]", + LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]), e); postProcessChuck(e); } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java index 1445dd87275..567438d8066 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -31,7 +31,6 @@ import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow;