This commit is contained in:
parent
5e48811585
commit
044a4e127a
|
@ -60,7 +60,8 @@ public final class Messages {
|
|||
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED = "Started analytics";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_STOPPED = "Stopped analytics";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_DELETED = "Deleted analytics";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE = "Successfully updated analytics task state to [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON =
|
||||
"Updated analytics task state to [{0}] with reason [{1}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
|
||||
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
|
||||
|
|
|
@ -93,12 +93,12 @@ public class DataFrameAnalyticsManager {
|
|||
executeJobInMiddleOfReindexing(task, config);
|
||||
break;
|
||||
default:
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, "Cannot execute analytics task [" + config.getId() +
|
||||
task.setFailed("Cannot execute analytics task [" + config.getId() +
|
||||
"] as it is in unknown state [" + currentState + "]. Must be one of [STARTED, REINDEXING, ANALYZING]");
|
||||
}
|
||||
|
||||
},
|
||||
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
|
||||
error -> task.setFailed(error.getMessage())
|
||||
);
|
||||
|
||||
// Retrieve configuration
|
||||
|
@ -122,13 +122,13 @@ public class DataFrameAnalyticsManager {
|
|||
case FIRST_TIME:
|
||||
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
|
||||
updatedTask -> reindexDataframeAndStartAnalysis(task, config),
|
||||
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
|
||||
error -> task.setFailed(error.getMessage())
|
||||
));
|
||||
break;
|
||||
case RESUMING_REINDEXING:
|
||||
task.updatePersistentTaskState(reindexingState, ActionListener.wrap(
|
||||
updatedTask -> executeJobInMiddleOfReindexing(task, config),
|
||||
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
|
||||
error -> task.setFailed(error.getMessage())
|
||||
));
|
||||
break;
|
||||
case RESUMING_ANALYZING:
|
||||
|
@ -136,7 +136,7 @@ public class DataFrameAnalyticsManager {
|
|||
break;
|
||||
case FINISHED:
|
||||
default:
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, "Unexpected starting state [" + startingState + "]");
|
||||
task.setFailed("Unexpected starting state [" + startingState + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,7 +151,7 @@ public class DataFrameAnalyticsManager {
|
|||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
|
||||
reindexDataframeAndStartAnalysis(task, config);
|
||||
} else {
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, e.getMessage());
|
||||
task.setFailed(e.getMessage());
|
||||
}
|
||||
}
|
||||
));
|
||||
|
@ -178,7 +178,7 @@ public class DataFrameAnalyticsManager {
|
|||
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
|
||||
startAnalytics(task, config);
|
||||
},
|
||||
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
|
||||
error -> task.setFailed(error.getMessage())
|
||||
);
|
||||
|
||||
// Reindex
|
||||
|
@ -244,12 +244,12 @@ public class DataFrameAnalyticsManager {
|
|||
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
|
||||
// Task has stopped
|
||||
} else {
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
|
||||
task.setFailed(error.getMessage());
|
||||
}
|
||||
}
|
||||
));
|
||||
},
|
||||
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
|
||||
error -> task.setFailed(error.getMessage())
|
||||
);
|
||||
|
||||
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
|
||||
|
|
|
@ -177,17 +177,20 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
|
|||
}
|
||||
}
|
||||
|
||||
public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
|
||||
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
|
||||
public void setFailed(String reason) {
|
||||
DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED,
|
||||
getAllocationId(), reason);
|
||||
updatePersistentTaskState(
|
||||
newTaskState,
|
||||
ActionListener.wrap(
|
||||
updatedTask -> {
|
||||
auditor.info(getParams().getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE, state));
|
||||
LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state);
|
||||
String message = Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_UPDATED_STATE_WITH_REASON,
|
||||
DataFrameAnalyticsState.FAILED, reason);
|
||||
auditor.info(getParams().getId(), message);
|
||||
LOGGER.info("[{}] {}", getParams().getId(), message);
|
||||
},
|
||||
e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}] with reason [{}]",
|
||||
getParams().getId(), state, reason), e)
|
||||
getParams().getId(), DataFrameAnalyticsState.FAILED, reason), e)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.search.SearchHit;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.analyses.DataFrameAnalysis;
|
||||
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
|
||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||
|
@ -110,8 +109,7 @@ public class AnalyticsProcessManager {
|
|||
return;
|
||||
}
|
||||
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
|
||||
task.updateState(
|
||||
DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists");
|
||||
task.setFailed("[" + config.getId() + "] Could not create process as one already exists");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -193,7 +191,7 @@ public class AnalyticsProcessManager {
|
|||
task.markAsCompleted();
|
||||
} else {
|
||||
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
|
||||
task.setFailed(processContext.getFailureReason());
|
||||
// Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
|
||||
}
|
||||
}
|
||||
|
@ -265,7 +263,7 @@ public class AnalyticsProcessManager {
|
|||
process.restoreState(state);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
|
||||
task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage());
|
||||
task.setFailed("Failed to restore state: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -14,7 +14,6 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
||||
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
|
||||
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
|
||||
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor;
|
||||
|
@ -131,7 +130,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
|
|||
inOrder.verify(task).getStatsHolder();
|
||||
inOrder.verify(task).isStopping();
|
||||
inOrder.verify(task).getAllocationId();
|
||||
inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists");
|
||||
inOrder.verify(task).setFailed("[config-id] Could not create process as one already exists");
|
||||
verifyNoMoreInteractions(task);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue