DFA Get Stats can return multiple responses if more than one error occurs (#60950)

If the search for get stats with multiple job Ids fails the listener is called for each failure. 
This change waits for all responses then returns the first error if there was one.
This commit is contained in:
David Kyle 2020-08-11 10:28:05 +01:00 committed by GitHub
parent a5ef38ca40
commit 18a65c5b9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 3 deletions

View File

@ -63,6 +63,7 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -167,6 +168,7 @@ public class TransportGetDataFrameAnalyticsStatsAction
AtomicInteger counter = new AtomicInteger(stoppedConfigs.size()); AtomicInteger counter = new AtomicInteger(stoppedConfigs.size());
AtomicArray<Stats> jobStats = new AtomicArray<>(stoppedConfigs.size()); AtomicArray<Stats> jobStats = new AtomicArray<>(stoppedConfigs.size());
AtomicReference<Exception> searchException = new AtomicReference<>();
for (int i = 0; i < stoppedConfigs.size(); i++) { for (int i = 0; i < stoppedConfigs.size(); i++) {
final int slot = i; final int slot = i;
DataFrameAnalyticsConfig config = stoppedConfigs.get(i); DataFrameAnalyticsConfig config = stoppedConfigs.get(i);
@ -174,6 +176,10 @@ public class TransportGetDataFrameAnalyticsStatsAction
stats -> { stats -> {
jobStats.set(slot, stats); jobStats.set(slot, stats);
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
listener.onFailure(searchException.get());
return;
}
List<Stats> allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results()); List<Stats> allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results());
allTasksStats.addAll(jobStats.asList()); allTasksStats.addAll(jobStats.asList());
Collections.sort(allTasksStats, Comparator.comparing(Stats::getId)); Collections.sort(allTasksStats, Comparator.comparing(Stats::getId));
@ -181,7 +187,13 @@ public class TransportGetDataFrameAnalyticsStatsAction
allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD))); allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
} }
}, },
listener::onFailure) e -> {
// take the first error
searchException.compareAndSet(null, e);
if (counter.decrementAndGet() == 0) {
listener.onFailure(e);
}
})
); );
} }
} }

View File

@ -46,6 +46,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -143,7 +144,17 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
} }
AtomicInteger counter = new AtomicInteger(closedJobIds.size()); AtomicInteger counter = new AtomicInteger(closedJobIds.size());
AtomicReference<Exception> searchException = new AtomicReference<>();
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size()); AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());
Consumer<Exception> errorHandler = e -> {
// take the first error
searchException.compareAndSet(null, e);
if (counter.decrementAndGet() == 0) {
listener.onFailure(e);
}
};
PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE); PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
for (int i = 0; i < closedJobIds.size(); i++) { for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i; int slot = i;
@ -159,14 +170,19 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState, jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
null, assignmentExplanation, null, timingStats)); null, assignmentExplanation, null, timingStats));
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
// there was an error
listener.onFailure(searchException.get());
return;
}
List<JobStats> results = response.getResponse().results(); List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList()); results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId)); Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
} }
}, listener::onFailure); }, errorHandler);
}, listener::onFailure); }, errorHandler);
} }
} }