pass down batch size correctly

This commit is contained in:
Martijn van Groningen 2017-11-30 16:46:04 +01:00
parent 3cd5eed91e
commit cc5665bce6
1 changed files with 5 additions and 5 deletions

View File

@ -68,10 +68,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
} else { } else {
followGlobalCheckPoint = SequenceNumbers.NO_OPS_PERFORMED; followGlobalCheckPoint = SequenceNumbers.NO_OPS_PERFORMED;
} }
prepare(task, params.getLeaderShardId(), params.getFollowShardId(), followGlobalCheckPoint); prepare(task, params.getLeaderShardId(), params.getFollowShardId(), params.getBatchSize(), followGlobalCheckPoint);
} }
void prepare(AllocatedPersistentTask task, ShardId leaderShard, ShardId followerShard, long followGlobalCheckPoint) { void prepare(AllocatedPersistentTask task, ShardId leaderShard, ShardId followerShard, long batchSize, long followGlobalCheckPoint) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) { if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control // TODO: need better cancellation control
return; return;
@ -97,14 +97,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
ShardFollowTask.Status newStatus = new ShardFollowTask.Status(); ShardFollowTask.Status newStatus = new ShardFollowTask.Status();
newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint); newStatus.setProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
task.updatePersistentStatus(newStatus, ActionListener.wrap(persistentTask -> prepare(task, task.updatePersistentStatus(newStatus, ActionListener.wrap(persistentTask -> prepare(task,
leaderShard, followerShard, leaderGlobalCheckPoint), task::markAsFailed) leaderShard, followerShard, batchSize, leaderGlobalCheckPoint), task::markAsFailed)
); );
} else { } else {
task.markAsFailed(e); task.markAsFailed(e);
} }
}; };
ChunksCoordinator coordinator = ChunksCoordinator coordinator =
new ChunksCoordinator(client, ccrExecutor, DEFAULT_BATCH_SIZE, leaderShard, followerShard, handler); new ChunksCoordinator(client, ccrExecutor, batchSize, leaderShard, followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.processChuck(); coordinator.processChuck();
} }
@ -123,7 +123,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override @Override
protected void doRun() throws Exception { protected void doRun() throws Exception {
prepare(task, leaderShard, followerShard, followGlobalCheckPoint); prepare(task, leaderShard, followerShard, params.getBatchSize(), followGlobalCheckPoint);
} }
}); });
} }