[7.x][ML] Unwrap exception causes before calling instanceof (#47676) (#47724)

When exceptions could be returned from another node, the exception
might be wrapped in a `RemoteTransportException`. In places where
we handled specific exceptions using `instanceof` we ought to unwrap
the cause first.

This commit attempts to fix this issue after searching code in the ML
plugin.

Backport of #47676
This commit is contained in:
Dimitris Athanasiou 2019-10-08 16:02:47 +03:00 committed by GitHub
parent 9b4eec9887
commit c1b0bfd74a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 64 additions and 47 deletions

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.SortedMap;
@ -84,7 +85,7 @@ public class AnnotationIndex {
e -> {
// Possible that the index was created while the request was executing,
// so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
// Create the alias
createAliasListener.onResponse(true);
} else {

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.util.Arrays;
import java.util.Collections;
@ -133,7 +134,7 @@ public final class AnomalyDetectorsIndex {
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (createIndexFailure instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
} else {
finalListener.onFailure(createIndexFailure);

View File

@ -99,4 +99,8 @@ public class ExceptionsHelper {
public static <T> T requireNonNull(T obj, ParseField paramName) {
return requireNonNull(obj, paramName.getPreferredName());
}
public static Throwable unwrapCause(Throwable t) {
return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
}
}

View File

@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
@ -468,7 +469,7 @@ public class MlConfigMigrator {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
e -> {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// the snapshot already exists
listener.onResponse(Boolean.TRUE);
} else {

View File

@ -341,7 +341,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if ((e instanceof ResourceNotFoundException &&
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getJobId()})) == false) {
failures.set(slot - 1, e);
}

View File

@ -119,7 +119,7 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has been removed in between
listener.onResponse(true);
} else {

View File

@ -423,7 +423,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
response -> finishedHandler.onResponse(true),
e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
finishedHandler.onResponse(true);
} else {
finishedHandler.onFailure(e);
@ -467,7 +467,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
},
e -> {
// It's not a problem for us if the index wasn't found - it's equivalent to document not found
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
finishedHandler.onResponse(true);
} else {
finishedHandler.onFailure(e);
@ -537,7 +537,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
normalDeleteJob(parentTaskClient, request, listener);
} else {
listener.onFailure(e);
@ -550,7 +550,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
ActionListener<KillProcessAction.Response> killJobListener = ActionListener.wrap(
response -> removePersistentTask(request.getJobId(), state, removeTaskListener),
e -> {
if (e instanceof ElasticsearchStatusException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ElasticsearchStatusException) {
// Killing the process marks the task as completed so it
// may have disappeared when we get here
removePersistentTask(request.getJobId(), state, removeTaskListener);

View File

@ -232,7 +232,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("Cannot open job [" + jobParams.getJobId() +
"] because it has already been opened", RestStatus.CONFLICT, e);
}

View File

@ -68,7 +68,7 @@ public class TransportPutCalendarAction extends HandledTransportAction<PutCalend
@Override
public void onFailure(Exception e) {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
listener.onFailure(ExceptionsHelper.badRequestException("Cannot create calendar with id [" +
calendar.getId() + "] as it already exists"));
} else {

View File

@ -132,7 +132,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
},
e -> {
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
indicesPrivilegesBuilder.privileges(SearchAction.NAME);
privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);

View File

@ -67,7 +67,7 @@ public class TransportPutFilterAction extends HandledTransportAction<PutFilterAc
@Override
public void onFailure(Exception e) {
Exception reportedException;
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
reportedException = new ResourceAlreadyExistsException("A filter with id [" + filter.getId()
+ "] already exists");
} else {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
import java.io.IOException;
@ -271,7 +272,7 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
// If the task was removed from the node, all is well
// We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion
// Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise.
ex -> ex instanceof ResourceNotFoundException == false);
ex -> ExceptionsHelper.unwrapCause(ex) instanceof ResourceNotFoundException == false);
for (PersistentTask<?> task : datafeedAndJobTasks) {
chainTaskExecutor.add(

View File

@ -153,7 +153,7 @@ public class TransportStartDataFrameAnalyticsAction
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException("Cannot start data frame analytics [" + request.getId() +
"] because it has already been started", RestStatus.CONFLICT, e);
}
@ -341,7 +341,7 @@ public class TransportStartDataFrameAnalyticsAction
}
},
e -> {
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
listener.onResponse(startContext);
} else {
listener.onFailure(e);

View File

@ -174,7 +174,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
logger.debug("datafeed already started", e);
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
"] because it has already been started", RestStatus.CONFLICT);

View File

@ -249,7 +249,7 @@ public class TransportStopDataFrameAnalyticsAction
});
},
e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has disappeared so must have stopped
listener.onResponse(new StopDataFrameAnalyticsAction.Response(true));
} else {

View File

@ -29,6 +29,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
@ -195,7 +196,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if ((e instanceof ResourceNotFoundException &&
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
failures.set(slot - 1, e);
}
@ -244,7 +245,7 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
});
},
e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// the task has disappeared so must have stopped
listener.onResponse(new StopDatafeedAction.Response(true));
} else {

View File

@ -125,7 +125,7 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
@Override
public void onFailure(Exception e) {
Exception reportedException;
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
reportedException = ExceptionsHelper.conflictStatusException("Error updating filter with id [" + filter.getId()
+ "] because it was modified while the update was in progress", e);
} else {

View File

@ -148,7 +148,7 @@ public class DatafeedJobBuilder {
.size(1)
.includeInterim(false);
jobResultsProvider.bucketsViaInternalClient(jobId, latestBucketQuery, bucketsHandler, e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
QueryPage<Bucket> empty = new QueryPage<>(Collections.emptyList(), 0, Bucket.RESULT_TYPE_FIELD);
bucketsHandler.accept(empty);
} else {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@ -96,7 +97,7 @@ public class DatafeedManager {
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// The task was stopped in the meantime, no need to do anything
logger.info("[{}] Aborting as datafeed has been stopped", datafeedId);
} else {

View File

@ -15,6 +15,7 @@ import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
@ -59,9 +60,10 @@ public interface DataExtractorFactory {
}
},
e -> {
if (e instanceof IndexNotFoundException) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException)e).getIndex() + " does not exist"));
+ "] cannot retrieve data because index " + ((IndexNotFoundException) cause).getIndex() + " does not exist"));
} else {
listener.onFailure(e);
}

View File

@ -74,9 +74,10 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
new ScrollDataExtractorFactory(client, datafeed, job, extractedFields, xContentRegistry, timingStatsReporter));
},
e -> {
if (e instanceof IndexNotFoundException) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
+ "] cannot retrieve data because index " + ((IndexNotFoundException) cause).getIndex() + " does not exist"));
} else if (e instanceof IllegalArgumentException) {
listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage()));
} else {

View File

@ -133,7 +133,7 @@ public class DatafeedConfigProvider {
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
listener::onResponse,
e -> {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// the dafafeed already exists
listener.onFailure(ExceptionsHelper.datafeedAlreadyExists(datafeedId));
} else {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.dataframe.process.AnalyticsProcessManager;
@ -142,7 +143,7 @@ public class DataFrameAnalyticsManager {
ActionListener.wrap(
r-> reindexDataframeAndStartAnalysis(task, config),
e -> {
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
reindexDataframeAndStartAnalysis(task, config);
} else {
task.updateState(DataFrameAnalyticsState.FAILED, e.getMessage());
@ -224,7 +225,7 @@ public class DataFrameAnalyticsManager {
));
},
e -> {
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX, config.getDest().getIndex()));
@ -260,7 +261,7 @@ public class DataFrameAnalyticsManager {
}
}),
error -> {
if (error instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
// Task has stopped
} else {
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());

View File

@ -162,7 +162,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
}
// There is a chance that the task is finished by the time we cancel it in which case we'll get
// a ResourceNotFoundException which we can ignore.
if (firstError != null && firstError instanceof ResourceNotFoundException == false) {
if (firstError != null && ExceptionsHelper.unwrapCause(firstError) instanceof ResourceNotFoundException == false) {
throw ExceptionsHelper.serverError("[" + taskParams.getId() + "] Error cancelling reindex task", firstError);
} else {
LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId());
@ -215,7 +215,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
listener.onResponse(progress);
},
error -> {
if (error instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
// The task is not present which means either it has not started yet or it finished.
// We keep track of whether the task has finished so we can use that to tell whether the progress 100.
listener.onResponse(isReindexingFinished ? 100 : 0);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedFields;
@ -219,7 +220,7 @@ public class DataFrameDataExtractorFactory {
docValueFieldsLimitListener.onResponse(minDocValueFieldsLimit);
},
e -> {
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
docValueFieldsLimitListener.onFailure(new ResourceNotFoundException("cannot retrieve data because index "
+ ((IndexNotFoundException) e).getIndex() + " does not exist"));
} else {

View File

@ -99,7 +99,7 @@ public class DataFrameAnalyticsConfigProvider {
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
listener::onResponse,
e -> {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
listener.onFailure(ExceptionsHelper.dataFrameAnalyticsAlreadyExists(id));
} else {
listener.onFailure(e);

View File

@ -36,6 +36,7 @@ import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.io.InputStream;
@ -72,7 +73,7 @@ public class TrainedModelProvider {
trainedModelConfig.getModelId(),
trainedModelConfig.getModelVersion()),
e);
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
listener.onFailure(new ResourceAlreadyExistsException(
Messages.getMessage(Messages.INFERENCE_TRAINED_MODEL_EXISTS,
trainedModelConfig.getModelId(), trainedModelConfig.getModelVersion())));

View File

@ -142,7 +142,7 @@ public class JobManager {
jobConfigProvider.getJob(jobId, ActionListener.wrap(
r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here
e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// Try to get the job from the cluster state
getJobFromClusterState(jobId, jobListener);
} else {
@ -272,7 +272,7 @@ public class JobManager {
@Override
public void onFailure(Exception e) {
if (e instanceof IllegalArgumentException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IllegalArgumentException) {
// the underlying error differs depending on which way around the clashing fields are seen
Matcher matcher = Pattern.compile("(?:mapper|Can't merge a non object mapping) \\[(.*)\\] (?:of different type, " +
"current_type \\[.*\\], merged_type|with an object mapping) \\[.*\\]").matcher(e.getMessage());

View File

@ -135,13 +135,14 @@ public class UpdateJobProcessNotifier {
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof ResourceNotFoundException) {
logger.debug("Remote job [{}] not updated as it has been deleted", update.getJobId());
} else if (e.getMessage().contains("because job [" + update.getJobId() + "] is not open")
&& e instanceof ElasticsearchStatusException) {
} else if (cause.getMessage().contains("because job [" + update.getJobId() + "] is not open")
&& cause instanceof ElasticsearchStatusException) {
logger.debug("Remote job [{}] not updated as it is no longer open", update.getJobId());
} else {
logger.error("Failed to update remote job [" + update.getJobId() + "]", e);
logger.error("Failed to update remote job [" + update.getJobId() + "]", cause);
}
updateHolder.listener.onFailure(e);
executeProcessUpdates(updatesIterator);

View File

@ -130,7 +130,7 @@ public class JobConfigProvider {
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
listener::onResponse,
e -> {
if (e instanceof VersionConflictEngineException) {
if (ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// the job already exists
listener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId()));
} else {
@ -415,7 +415,7 @@ public class JobConfigProvider {
* For the list of job Ids find all that match existing jobs Ids.
* The repsonse is all the job Ids in {@code ids} that match an existing
* job Id.
* @param ids Job Ids to find
* @param ids Job Ids to find
* @param listener The matched Ids listener
*/
public void jobIdMatches(List<String> ids, ActionListener<List<String>> listener) {

View File

@ -304,7 +304,7 @@ public class JobResultsProvider {
e -> {
// Possible that the index was created while the request was executing,
// so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException) {
LOGGER.info("Index already exists");
// Add the term field mappings and alias. The complication is that the state at the
// beginning of the operation doesn't have any knowledge of the index, as it's only
@ -1189,7 +1189,7 @@ public class JobResultsProvider {
.sortDescending(true).from(BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE - 1).size(1)
.includeInterim(false);
bucketsViaInternalClient(jobId, bucketQuery, bucketHandler, e -> {
if (e instanceof ResourceNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
handler.accept(0L);
} else {
errorHandler.accept(e);
@ -1437,7 +1437,7 @@ public class JobResultsProvider {
@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("No calendar with id [" + calendarId + "]"));
} else {
listener.onFailure(e);