Fix NPE in RemoteTaskRunner when some tasks in ZooKeeper but not in Overlord (#5511)

* Fix NPE in RemoteTaskRunner when some tasks in ZooKeeper but not in Overlord

* revert unnecessary change
This commit is contained in:
Jihoon Son 2018-04-03 21:15:58 -07:00 committed by Jonathan Wei
parent f0a94f5035
commit 7239f56131
5 changed files with 55 additions and 9 deletions

View File

@ -968,7 +968,7 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown(),
runningTasks.get(taskId).getDataSource()
announcement.getTaskDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,

View File

@ -387,7 +387,8 @@ public class WorkerHolder
announcement.getTaskType(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId()),
announcement.getTaskLocation()
announcement.getTaskLocation(),
announcement.getTaskDataSource()
));
}
}
@ -423,7 +424,8 @@ public class WorkerHolder
announcement.getTaskType(),
announcement.getTaskResource(),
TaskStatus.failure(announcement.getTaskId()),
announcement.getTaskLocation()
announcement.getTaskLocation(),
announcement.getTaskDataSource()
));
}
} else if (change instanceof WorkerHistoryItem.Metadata) {

View File

@ -28,6 +28,8 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import javax.annotation.Nullable;
/**
* Used by workers to announce the status of tasks they are currently running. This class is immutable.
*/
@ -38,9 +40,12 @@ public class TaskAnnouncement
private final TaskResource taskResource;
private final TaskLocation taskLocation;
@Nullable
private final String taskDataSource; // nullable for backward compatibility
public static TaskAnnouncement create(Task task, TaskStatus status, TaskLocation location)
{
return create(task.getId(), task.getType(), task.getTaskResource(), status, location);
return create(task.getId(), task.getType(), task.getTaskResource(), status, location, task.getDataSource());
}
public static TaskAnnouncement create(
@ -48,11 +53,12 @@ public class TaskAnnouncement
String taskType,
TaskResource resource,
TaskStatus status,
TaskLocation location
TaskLocation location,
String taskDataSource
)
{
Preconditions.checkArgument(status.getId().equals(taskId), "task id == status id");
return new TaskAnnouncement(null, taskType, null, status, resource, location);
return new TaskAnnouncement(null, taskType, null, status, resource, location, taskDataSource);
}
@JsonCreator
@ -62,7 +68,8 @@ public class TaskAnnouncement
@JsonProperty("status") TaskState status,
@JsonProperty("taskStatus") TaskStatus taskStatus,
@JsonProperty("taskResource") TaskResource taskResource,
@JsonProperty("taskLocation") TaskLocation taskLocation
@JsonProperty("taskLocation") TaskLocation taskLocation,
@JsonProperty("taskDataSource") String taskDataSource
)
{
this.taskType = taskType;
@ -74,6 +81,7 @@ public class TaskAnnouncement
}
this.taskResource = taskResource == null ? new TaskResource(this.taskStatus.getId(), 1) : taskResource;
this.taskLocation = taskLocation == null ? TaskLocation.unknown() : taskLocation;
this.taskDataSource = taskDataSource;
}
@JsonProperty("id")
@ -112,13 +120,21 @@ public class TaskAnnouncement
return taskLocation;
}
@JsonProperty("taskDataSource")
public String getTaskDataSource()
{
return taskDataSource;
}
@Override
public String toString()
{
return "TaskAnnouncement{" +
"taskStatus=" + taskStatus +
"taskType=" + taskType +
", taskStatus=" + taskStatus +
", taskResource=" + taskResource +
", taskLocation=" + taskLocation +
", taskDataSource=" + taskDataSource +
'}';
}
}

View File

@ -136,7 +136,8 @@ public class WorkerTaskMonitor extends WorkerTaskManager
announcement.getTaskType(),
announcement.getTaskResource(),
completionStatus,
TaskLocation.unknown()
TaskLocation.unknown(),
announcement.getTaskDataSource()
)
);
}

View File

@ -447,6 +447,33 @@ public class RemoteTaskRunnerTest
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
}
@Test
public void testRestartRemoteTaskRunner() throws Exception
{
doSetup();
remoteTaskRunner.run(task);
Assert.assertTrue(taskAnnounced(task.getId()));
mockWorkerRunningTask(task);
Assert.assertTrue(workerRunningTask(task.getId()));
remoteTaskRunner.stop();
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT5S")));
final RemoteTaskRunnerWorkItem newWorkItem = remoteTaskRunner
.getKnownTasks()
.stream()
.filter(workItem -> workItem.getTaskId().equals(task.getId()))
.findFirst()
.orElse(null);
final ListenableFuture<TaskStatus> result = newWorkItem.getResult();
mockWorkerCompleteSuccessfulTask(task);
Assert.assertTrue(workerCompletedTask(result));
Assert.assertEquals(task.getId(), result.get().getId());
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
}
private void doSetup() throws Exception
{
makeWorker();