mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-24 13:55:57 +00:00
Also unmutes the integ test that stops and restarts an outlier detection job with the hope of learning more of the failure in #55068. Backport of #55545 Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
0a6c74b7d3
commit
b8379872a7
@ -595,7 +595,6 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
|
||||
"Stopped analytics");
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55068")
|
||||
public void testOutlierDetectionStopAndRestart() throws Exception {
|
||||
String sourceIndex = "test-outlier-detection-stop-and-restart";
|
||||
|
||||
|
@ -99,12 +99,12 @@ public class DataFrameAnalyticsManager {
|
||||
executeJobInMiddleOfReindexing(task, config);
|
||||
break;
|
||||
default:
|
||||
task.setFailed("Cannot execute analytics task [" + config.getId() +
|
||||
"] as it is in unknown state [" + currentState + "]. Must be one of [STARTED, REINDEXING, ANALYZING]");
|
||||
task.setFailed(ExceptionsHelper.serverError("Cannot execute analytics task [" + config.getId() +
|
||||
"] as it is in unknown state [" + currentState + "]. Must be one of [STARTED, REINDEXING, ANALYZING]"));
|
||||
}
|
||||
|
||||
},
|
||||
error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
|
||||
task::setFailed
|
||||
);
|
||||
|
||||
// Retrieve configuration
|
||||
@ -154,13 +154,13 @@ public class DataFrameAnalyticsManager {
|
||||
case FIRST_TIME:
|
||||
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
|
||||
updatedTask -> reindexDataframeAndStartAnalysis(task, config),
|
||||
error -> task.setFailed(error.getMessage())
|
||||
task::setFailed
|
||||
));
|
||||
break;
|
||||
case RESUMING_REINDEXING:
|
||||
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
|
||||
updatedTask -> executeJobInMiddleOfReindexing(task, config),
|
||||
error -> task.setFailed(error.getMessage())
|
||||
task::setFailed
|
||||
));
|
||||
break;
|
||||
case RESUMING_ANALYZING:
|
||||
@ -168,7 +168,7 @@ public class DataFrameAnalyticsManager {
|
||||
break;
|
||||
case FINISHED:
|
||||
default:
|
||||
task.setFailed("Unexpected starting state [" + startingState + "]");
|
||||
task.setFailed(ExceptionsHelper.serverError("Unexpected starting state [" + startingState + "]"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -189,7 +189,7 @@ public class DataFrameAnalyticsManager {
|
||||
if (cause instanceof IndexNotFoundException) {
|
||||
reindexDataframeAndStartAnalysis(task, config);
|
||||
} else {
|
||||
task.setFailed(cause.getMessage());
|
||||
task.setFailed(e);
|
||||
}
|
||||
}
|
||||
));
|
||||
@ -230,7 +230,7 @@ public class DataFrameAnalyticsManager {
|
||||
config.getId()), error);
|
||||
task.markAsCompleted();
|
||||
} else {
|
||||
task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage());
|
||||
task.setFailed(error);
|
||||
}
|
||||
}
|
||||
);
|
||||
@ -317,12 +317,12 @@ public class DataFrameAnalyticsManager {
|
||||
if (cause instanceof ResourceNotFoundException) {
|
||||
// Task has stopped
|
||||
} else {
|
||||
task.setFailed(cause.getMessage());
|
||||
task.setFailed(error);
|
||||
}
|
||||
}
|
||||
));
|
||||
},
|
||||
error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
|
||||
task::setFailed
|
||||
);
|
||||
|
||||
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
|
||||
|
@ -197,7 +197,9 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
|
||||
}
|
||||
}
|
||||
|
||||
public void setFailed(String reason) {
|
||||
public void setFailed(Exception error) {
|
||||
LOGGER.error(new ParameterizedMessage("[{}] Setting task to failed", taskParams.getId()), error);
|
||||
String reason = ExceptionsHelper.unwrapCause(error).getMessage();
|
||||
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
|
||||
getAllocationId(), reason);
|
||||
updatePersistentTaskState(
|
||||
|
@ -116,7 +116,8 @@ public class AnalyticsProcessManager {
|
||||
return;
|
||||
}
|
||||
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
|
||||
task.setFailed("[" + config.getId() + "] Could not create process as one already exists");
|
||||
task.setFailed(ExceptionsHelper.serverError(
|
||||
"[" + config.getId() + "] Could not create process as one already exists"));
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -207,7 +208,7 @@ public class AnalyticsProcessManager {
|
||||
task.markAsCompleted();
|
||||
} else {
|
||||
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
|
||||
task.setFailed(processContext.getFailureReason());
|
||||
task.setFailed(ExceptionsHelper.serverError(processContext.getFailureReason()));
|
||||
// Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
|
||||
}
|
||||
}
|
||||
@ -285,7 +286,7 @@ public class AnalyticsProcessManager {
|
||||
process.restoreState(state);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
|
||||
task.setFailed("Failed to restore state: " + e.getMessage());
|
||||
task.setFailed(ExceptionsHelper.serverError("Failed to restore state: " + e.getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
|
||||
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
|
||||
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
|
||||
import org.junit.Before;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InOrder;
|
||||
|
||||
import java.util.Arrays;
|
||||
@ -139,7 +140,11 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
|
||||
inOrder.verify(task).getStatsHolder();
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).getAllocationId();
|
||||
inOrder.verify(task).setFailed("[config-id] Could not create process as one already exists");
|
||||
|
||||
ArgumentCaptor<Exception> failureCaptor = ArgumentCaptor.forClass(Exception.class);
|
||||
inOrder.verify(task).setFailed(failureCaptor.capture());
|
||||
assertThat(failureCaptor.getValue().getMessage(), equalTo("[config-id] Could not create process as one already exists"));
|
||||
|
||||
verifyNoMoreInteractions(task);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user