Fix fetch of task location in SpecificTaskServiceLocator (#16462)

* Fix fetch of task location in SpecificTaskServiceLocator

* Resolve future if exception occurs while invoking API

* Remove unused import
This commit is contained in:
Kashif Faraz 2024-05-20 12:35:04 +05:30 committed by GitHub
parent a124c6cbbd
commit 15d27f340d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 105 additions and 55 deletions

View File

@ -23,14 +23,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.rpc.ServiceLocation;
@ -57,7 +56,6 @@ public class SpecificTaskServiceLocator implements ServiceLocator
private final String taskId;
private final OverlordClient overlordClient;
private final TaskLocationFetcher locationFetcher = new TaskLocationFetcher();
private final Object lock = new Object();
@GuardedBy("lock")
@ -125,42 +123,15 @@ public class SpecificTaskServiceLocator implements ServiceLocator
lastUpdateTime = System.currentTimeMillis();
final TaskStatus status = taskStatusMap.get(taskId);
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
lastKnownState = null;
lastKnownLocation = null;
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
// Do not resolve the future just yet, try the fallback API instead
fetchFallbackTaskLocation();
} else {
lastKnownState = status.getStatusCode();
final TaskLocation location;
if (TaskLocation.unknown().equals(status.getLocation())) {
location = locationFetcher.getLocation();
} else {
location = status.getLocation();
}
if (TaskLocation.unknown().equals(location)) {
lastKnownLocation = null;
} else {
lastKnownLocation = new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);
}
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}
if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}
@ -168,17 +139,10 @@ public class SpecificTaskServiceLocator implements ServiceLocator
@Override
public void onFailure(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
resolvePendingFutureOnException(t);
}
},
MoreExecutors.directExecutor()
Execs.directExecutor()
);
return Futures.nonCancellationPropagating(retVal);
@ -209,18 +173,104 @@ public class SpecificTaskServiceLocator implements ServiceLocator
}
}
private class TaskLocationFetcher
private void resolvePendingFuture(TaskState state, TaskLocation location)
{
TaskLocation getLocation()
{
final TaskStatusResponse statusResponse = FutureUtils.getUnchecked(
overlordClient.taskStatus(taskId),
true
);
if (statusResponse == null || statusResponse.getStatus() == null) {
return TaskLocation.unknown();
} else {
return statusResponse.getStatus().getLocation();
synchronized (lock) {
if (pendingFuture != null) {
lastKnownState = state;
lastKnownLocation = location == null ? null : new ServiceLocation(
location.getHost(),
location.getPort(),
location.getTlsPort(),
StringUtils.format("%s/%s", BASE_PATH, StringUtils.urlEncode(taskId))
);
if (lastKnownState != TaskState.RUNNING) {
pendingFuture.set(ServiceLocations.closed());
} else if (lastKnownLocation == null) {
pendingFuture.set(ServiceLocations.forLocations(Collections.emptySet()));
} else {
pendingFuture.set(ServiceLocations.forLocation(lastKnownLocation));
}
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}
private void resolvePendingFutureOnException(Throwable t)
{
synchronized (lock) {
if (pendingFuture != null) {
pendingFuture.setException(t);
// Clear pendingFuture once it has been set.
pendingFuture = null;
}
}
}
/**
* Invokes the single task status API {@link OverlordClient#taskStatus} if the
* multi-task status API returns an unknown location (this can happen if the
* Overlord is running on a version older than Druid 30.0.0 (pre #15724)).
*/
private void fetchFallbackTaskLocation()
{
synchronized (lock) {
if (pendingFuture != null) {
final ListenableFuture<TaskStatusResponse> taskStatusFuture;
try {
taskStatusFuture = overlordClient.taskStatus(taskId);
}
catch (Exception e) {
resolvePendingFutureOnException(e);
return;
}
pendingFuture.addListener(
() -> {
if (!taskStatusFuture.isDone()) {
// pendingFuture may resolve without taskStatusFuture due to close().
taskStatusFuture.cancel(true);
}
},
Execs.directExecutor()
);
Futures.addCallback(
taskStatusFuture,
new FutureCallback<TaskStatusResponse>()
{
@Override
public void onSuccess(final TaskStatusResponse taskStatusResponse)
{
synchronized (lock) {
if (pendingFuture != null) {
lastUpdateTime = System.currentTimeMillis();
final TaskStatusPlus status = taskStatusResponse.getStatus();
if (status == null) {
// If the task status is unknown, we'll treat it as closed.
resolvePendingFuture(null, null);
} else if (TaskLocation.unknown().equals(status.getLocation())) {
resolvePendingFuture(status.getStatusCode(), null);
} else {
resolvePendingFuture(status.getStatusCode(), status.getLocation());
}
}
}
}
@Override
public void onFailure(Throwable t)
{
resolvePendingFutureOnException(t);
}
},
Execs.directExecutor()
);
}
}
}