diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java index 3f5441318a5..689cbdce307 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/SpecificTaskServiceLocator.java @@ -23,14 +23,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.client.indexing.TaskStatusResponse; -import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.rpc.ServiceLocation; @@ -57,7 +56,6 @@ public class SpecificTaskServiceLocator implements ServiceLocator private final String taskId; private final OverlordClient overlordClient; - private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher(); private final Object lock = new Object(); @GuardedBy("lock") @@ -125,42 +123,15 @@ public class SpecificTaskServiceLocator implements ServiceLocator lastUpdateTime = System.currentTimeMillis(); final TaskStatus status = taskStatusMap.get(taskId); - if (status == null) { // If the task status is unknown, we'll treat it as closed. - lastKnownState = null; - lastKnownLocation = null; + resolvePendingFuture(null, null); + } else if (TaskLocation.unknown().equals(status.getLocation())) { + // Do not resolve the future just yet, try the fallback API instead + fetchFallbackTaskLocation(); } else { - lastKnownState = status.getStatusCode(); - final TaskLocation location; - if (TaskLocation.unknown().equals(status.getLocation())) { - location = locationFetcher.getLocation(); - } else { - location = status.getLocation(); - } - - if (TaskLocation.unknown().equals(location)) { - lastKnownLocation = null; - } else { - lastKnownLocation = new ServiceLocation( - location.getHost(), - location.getPort(), - location.getTlsPort(), - StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId)) - ); - } + resolvePendingFuture(status.getStatusCode(), status.getLocation()); } - - if (lastKnownState != TaskState.RUNNING) { - pendingFuture.set(ServiceLocations.closed()); - } else if (lastKnownLocation == null) { - pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet())); - } else { - pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation)); - } - - // Clear pendingFuture once it has been set. - pendingFuture = null; } } } @@ -168,17 +139,10 @@ public class SpecificTaskServiceLocator implements ServiceLocator @Override public void onFailure(Throwable t) { - synchronized (lock) { - if (pendingFuture != null) { - pendingFuture.setException(t); - - // Clear pendingFuture once it has been set. - pendingFuture = null; - } - } + resolvePendingFutureOnException(t); } }, - MoreExecutors.directExecutor() + Execs.directExecutor() ); return Futures.nonCancellationPropagating(retVal); @@ -209,18 +173,104 @@ public class SpecificTaskServiceLocator implements ServiceLocator } } - private class TaskLocationFetcher + private void resolvePendingFuture(TaskState state, TaskLocation location) { - TaskLocation getLocation() - { - final TaskStatusResponse statusResponse = FutureUtils.getUnchecked( - overlordClient.taskStatus(taskId), - true - ); - if (statusResponse == null || statusResponse.getStatus() == null) { - return TaskLocation.unknown(); - } else { - return statusResponse.getStatus().getLocation(); + synchronized (lock) { + if (pendingFuture != null) { + lastKnownState = state; + lastKnownLocation = location == null ? null : new ServiceLocation( + location.getHost(), + location.getPort(), + location.getTlsPort(), + StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId)) + ); + + if (lastKnownState != TaskState.RUNNING) { + pendingFuture.set(ServiceLocations.closed()); + } else if (lastKnownLocation == null) { + pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet())); + } else { + pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation)); + } + + // Clear pendingFuture once it has been set. + pendingFuture = null; + } + } + } + + private void resolvePendingFutureOnException(Throwable t) + { + synchronized (lock) { + if (pendingFuture != null) { + pendingFuture.setException(t); + + // Clear pendingFuture once it has been set. + pendingFuture = null; + } + } + } + + /** + * Invokes the single task status API {@link OverlordClient#taskStatus} if the + * multi-task status API returns an unknown location (this can happen if the + * Overlord is running on a version older than Druid 30.0.0 (pre #15724)). + */ + private void fetchFallbackTaskLocation() + { + synchronized (lock) { + if (pendingFuture != null) { + final ListenableFuture taskStatusFuture; + try { + taskStatusFuture = overlordClient.taskStatus(taskId); + } + catch (Exception e) { + resolvePendingFutureOnException(e); + return; + } + + pendingFuture.addListener( + () -> { + if (!taskStatusFuture.isDone()) { + // pendingFuture may resolve without taskStatusFuture due to close(). + taskStatusFuture.cancel(true); + } + }, + Execs.directExecutor() + ); + + Futures.addCallback( + taskStatusFuture, + new FutureCallback() + { + @Override + public void onSuccess(final TaskStatusResponse taskStatusResponse) + { + synchronized (lock) { + if (pendingFuture != null) { + lastUpdateTime = System.currentTimeMillis(); + + final TaskStatusPlus status = taskStatusResponse.getStatus(); + if (status == null) { + // If the task status is unknown, we'll treat it as closed. + resolvePendingFuture(null, null); + } else if (TaskLocation.unknown().equals(status.getLocation())) { + resolvePendingFuture(status.getStatusCode(), null); + } else { + resolvePendingFuture(status.getStatusCode(), status.getLocation()); + } + } + } + } + + @Override + public void onFailure(Throwable t) + { + resolvePendingFutureOnException(t); + } + }, + Execs.directExecutor() + ); } } }