Update the persistent task status in the background.

Follow up for #3256
This commit is contained in:
Martijn van Groningen 2018-01-12 12:59:27 +01:00
parent bad5135d35
commit 7a4860452a
2 changed files with 15 additions and 9 deletions

View File

@ -110,8 +110,15 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
ShardFollowTask.Status newStatus = new ShardFollowTask.Status();
newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
task.updatePersistentStatus(newStatus, ActionListener.wrap(
persistentTask -> prepare(task, params, leaderGlobalCheckPoint), task::markAsFailed)
);
persistentTask -> logger.debug("[{}] Successfully updated global checkpoint to {}",
leaderShard, leaderGlobalCheckPoint),
updateStatusException -> {
logger.error(new ParameterizedMessage("[{}] Failed to update global checkpoint to {}",
leaderShard, leaderGlobalCheckPoint), updateStatusException);
task.markAsFailed(updateStatusException);
}
));
prepare(task, params, leaderGlobalCheckPoint);
} else {
task.markAsFailed(e);
}
@ -171,7 +178,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}
void createChucks(long from, long to) {
LOGGER.debug("[{}] Creating chunks for operation range [{}] to [{}]", leaderShard, from, to);
LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to);
for (long i = from; i < to; i += batchSize) {
long v2 = i + batchSize < to ? i + batchSize : to;
chunks.add(new long[]{i == from ? i : i + 1, v2});
@ -179,14 +186,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}
void start() {
LOGGER.debug("[{}] Start coordination of [{}] chunks with [{}] concurrent processors",
LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors",
leaderShard, chunks.size(), concurrentProcessors);
for (int i = 0; i < concurrentProcessors; i++) {
ccrExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;
LOGGER.error(() -> new ParameterizedMessage("[{}] Failure starting processor", leaderShard), e);
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e);
postProcessChuck(e);
}
@ -204,13 +211,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
postProcessChuck(null);
return;
}
LOGGER.debug("[{}] Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
ChunkProcessor processor = new ChunkProcessor(client, ccrExecutor, leaderShard, followerShard, e -> {
if (e == null) {
LOGGER.debug("[{}] Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
processNextChunk();
} else {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failure processing chunk [{}/{}]",
LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]",
leaderShard, chunk[0], chunk[1]), e);
postProcessChuck(e);
}

View File

@ -31,7 +31,6 @@ import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;