Required changes after merging in master.

This commit is contained in:
Martijn van Groningen 2018-05-30 10:26:49 +02:00
parent 56472d6505
commit 4a20dca5fe
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 8 additions and 4 deletions

View File

@ -115,7 +115,7 @@ public class AllocatedPersistentTask extends CancellableTask {
persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener);
}
final boolean isCompleted() {
protected final boolean isCompleted() {
return state.get() == State.COMPLETED;
}

View File

@ -238,7 +238,7 @@ public class FollowIndexAction extends Action<FollowIndexAction.Request,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {

View File

@ -34,6 +34,10 @@ public class ShardFollowNodeTask extends AllocatedPersistentTask {
markAsCompleted();
}
public boolean isRunning() {
return isCancelled() == false && isCompleted() == false;
}
void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) {
this.processedGlobalCheckpoint.set(processedGlobalCheckpoint);
}

View File

@ -130,7 +130,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
if (task.isRunning() == false) {
// TODO: need better cancellation control
return;
}

View File

@ -141,7 +141,7 @@ public class UnfollowIndexAction extends Action<UnfollowIndexAction.Request, Unf
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
persistentTasksService.cancelPersistentTask(taskId,
persistentTasksService.sendRemoveRequest(taskId,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {