mirror of https://github.com/apache/druid.git
fix the case where RTR does not clean up a completed task on startup
This commit is contained in:
parent
0c5a906a1b
commit
1fb6107a37
|
@ -283,14 +283,26 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
{
|
{
|
||||||
RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId());
|
RemoteTaskRunnerWorkItem runningTask = runningTasks.get(task.getId());
|
||||||
if (runningTask != null) {
|
if (runningTask != null) {
|
||||||
log.info("Assigned a task[%s] that is already running, not doing anything", task.getId());
|
ZkWorker zkWorker = findWorkerRunningTask(task.getId());
|
||||||
|
if (zkWorker == null) {
|
||||||
|
log.error("Got task %s that is running but no worker is actually running it?", task.getId());
|
||||||
|
} else {
|
||||||
|
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
|
||||||
|
TaskStatus status = zkWorker.getRunningTasks().get(task.getId());
|
||||||
|
if (status.isComplete()) {
|
||||||
|
taskComplete(runningTask, zkWorker, task.getId(), status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return runningTask.getResult();
|
return runningTask.getResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
|
RemoteTaskRunnerWorkItem pendingTask = pendingTasks.get(task.getId());
|
||||||
if (pendingTask != null) {
|
if (pendingTask != null) {
|
||||||
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
|
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
|
||||||
return pendingTask.getResult();
|
return pendingTask.getResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
|
RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
|
||||||
task,
|
task,
|
||||||
SettableFuture.<TaskStatus>create()
|
SettableFuture.<TaskStatus>create()
|
||||||
|
@ -473,8 +485,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.makeAlert("Exception while trying to run task")
|
log.makeAlert("Exception while trying to run task")
|
||||||
.addData("taskId", taskRunnerWorkItem.getTask().getId())
|
.addData("taskId", taskRunnerWorkItem.getTask().getId())
|
||||||
.emit();
|
.emit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -590,16 +602,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskStatus.isComplete()) {
|
if (taskStatus.isComplete()) {
|
||||||
if (taskRunnerWorkItem != null) {
|
taskComplete(taskRunnerWorkItem, zkWorker, taskId, taskStatus);
|
||||||
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
|
|
||||||
if (result != null) {
|
|
||||||
((SettableFuture<TaskStatus>) result).set(taskStatus);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Worker is done with this task
|
|
||||||
zkWorker.setLastCompletedTaskTime(new DateTime());
|
|
||||||
cleanup(zkWorker.getWorker().getHost(), taskId);
|
|
||||||
runPendingTasks();
|
runPendingTasks();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -710,4 +713,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
|
||||||
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
|
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void taskComplete(
|
||||||
|
RemoteTaskRunnerWorkItem taskRunnerWorkItem,
|
||||||
|
ZkWorker zkWorker,
|
||||||
|
String taskId,
|
||||||
|
TaskStatus taskStatus
|
||||||
|
)
|
||||||
|
{
|
||||||
|
if (taskRunnerWorkItem != null) {
|
||||||
|
final ListenableFuture<TaskStatus> result = taskRunnerWorkItem.getResult();
|
||||||
|
if (result != null) {
|
||||||
|
((SettableFuture<TaskStatus>) result).set(taskStatus);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Worker is done with this task
|
||||||
|
zkWorker.setLastCompletedTaskTime(new DateTime());
|
||||||
|
cleanup(zkWorker.getWorker().getHost(), taskId);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -39,6 +39,7 @@ import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holds information about a worker and a listener for task status changes associated with the worker.
|
* Holds information about a worker and a listener for task status changes associated with the worker.
|
||||||
|
@ -49,7 +50,7 @@ public class ZkWorker implements Closeable
|
||||||
private final PathChildrenCache statusCache;
|
private final PathChildrenCache statusCache;
|
||||||
private final Function<ChildData, TaskStatus> cacheConverter;
|
private final Function<ChildData, TaskStatus> cacheConverter;
|
||||||
|
|
||||||
private volatile DateTime lastCompletedTaskTime = new DateTime();
|
private AtomicReference<DateTime> lastCompletedTaskTime = new AtomicReference<DateTime>(new DateTime());
|
||||||
|
|
||||||
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
|
public ZkWorker(Worker worker, PathChildrenCache statusCache, final ObjectMapper jsonMapper)
|
||||||
{
|
{
|
||||||
|
@ -128,7 +129,7 @@ public class ZkWorker implements Closeable
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
public DateTime getLastCompletedTaskTime()
|
public DateTime getLastCompletedTaskTime()
|
||||||
{
|
{
|
||||||
return lastCompletedTaskTime;
|
return lastCompletedTaskTime.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isRunningTask(String taskId)
|
public boolean isRunningTask(String taskId)
|
||||||
|
@ -154,7 +155,7 @@ public class ZkWorker implements Closeable
|
||||||
|
|
||||||
public void setLastCompletedTaskTime(DateTime completedTaskTime)
|
public void setLastCompletedTaskTime(DateTime completedTaskTime)
|
||||||
{
|
{
|
||||||
lastCompletedTaskTime = completedTaskTime;
|
lastCompletedTaskTime.getAndSet(completedTaskTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -258,6 +258,25 @@ public class RemoteTaskRunnerTest
|
||||||
Assert.assertFalse(runningTasks.contains("first"));
|
Assert.assertFalse(runningTasks.contains("first"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRunWithTaskComplete() throws Exception
|
||||||
|
{
|
||||||
|
cf.create()
|
||||||
|
.creatingParentsIfNeeded()
|
||||||
|
.withMode(CreateMode.EPHEMERAL)
|
||||||
|
.forPath(joiner.join(statusPath, task.getId()), jsonMapper.writeValueAsBytes(TaskStatus.success(task.getId())));
|
||||||
|
|
||||||
|
doSetup();
|
||||||
|
|
||||||
|
remoteTaskRunner.bootstrap(Arrays.<Task>asList(task));
|
||||||
|
|
||||||
|
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(task);
|
||||||
|
|
||||||
|
TaskStatus status = future.get();
|
||||||
|
|
||||||
|
Assert.assertEquals(TaskStatus.Status.SUCCESS, status.getStatusCode());
|
||||||
|
}
|
||||||
|
|
||||||
private void doSetup() throws Exception
|
private void doSetup() throws Exception
|
||||||
{
|
{
|
||||||
makeWorker();
|
makeWorker();
|
||||||
|
|
|
@ -47,9 +47,8 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
DataSegment proposalSegment, List<ServerHolder> serverHolders
|
DataSegment proposalSegment, List<ServerHolder> serverHolders
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
ServerHolder holder= chooseBestServer(proposalSegment, serverHolders, false).rhs;
|
ServerHolder holder = chooseBestServer(proposalSegment, serverHolders, false).rhs;
|
||||||
if (holder!=null && !holder.isServingSegment(proposalSegment))
|
if (holder != null && !holder.isServingSegment(proposalSegment)) {
|
||||||
{
|
|
||||||
return holder;
|
return holder;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -65,7 +64,6 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
|
* For assignment, we want to move to the lowest cost server that isn't already serving the segment.
|
||||||
*
|
*
|
||||||
|
@ -83,25 +81,10 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
{
|
{
|
||||||
|
|
||||||
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
|
Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
|
||||||
MinMaxPriorityQueue<Pair<Double, ServerHolder>> costsAndServers = MinMaxPriorityQueue.orderedBy(
|
|
||||||
new Comparator<Pair<Double, ServerHolder>>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public int compare(
|
|
||||||
Pair<Double, ServerHolder> o,
|
|
||||||
Pair<Double, ServerHolder> o1
|
|
||||||
)
|
|
||||||
{
|
|
||||||
return Double.compare(o.lhs, o1.lhs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
).create();
|
|
||||||
|
|
||||||
final long proposalSegmentSize = proposalSegment.getSize();
|
final long proposalSegmentSize = proposalSegment.getSize();
|
||||||
|
|
||||||
for (ServerHolder server : serverHolders) {
|
for (ServerHolder server : serverHolders) {
|
||||||
if (includeCurrentServer || !server.isServingSegment(proposalSegment))
|
if (includeCurrentServer || !server.isServingSegment(proposalSegment)) {
|
||||||
{
|
|
||||||
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
|
/** Don't calculate cost if the server doesn't have enough space or is loading the segment */
|
||||||
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
|
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -159,10 +142,10 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
|
referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis(),
|
||||||
referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
|
referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis()
|
||||||
);
|
);
|
||||||
double segment1diff=referenceTimestamp.getMillis()-segment1.getInterval().getEndMillis();
|
double segment1diff = referenceTimestamp.getMillis() - segment1.getInterval().getEndMillis();
|
||||||
double segment2diff=referenceTimestamp.getMillis()-segment2.getInterval().getEndMillis();
|
double segment2diff = referenceTimestamp.getMillis() - segment2.getInterval().getEndMillis();
|
||||||
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff <SEVEN_DAYS_IN_MILLIS) {
|
if (segment1diff < SEVEN_DAYS_IN_MILLIS && segment2diff < SEVEN_DAYS_IN_MILLIS) {
|
||||||
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS)*(2-segment2diff /SEVEN_DAYS_IN_MILLIS);
|
recencyPenalty = (2 - segment1diff / SEVEN_DAYS_IN_MILLIS) * (2 - segment2diff / SEVEN_DAYS_IN_MILLIS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** gap is null if the two segment intervals overlap or if they're adjacent */
|
/** gap is null if the two segment intervals overlap or if they're adjacent */
|
||||||
|
@ -251,6 +234,4 @@ public class CostBalancerStrategy implements BalancerStrategy
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue