[ML] Account for the possibility of no ML metadata existing (elastic/x-pack-elasticsearch#1648)
We try to install empty ML metadata as soon as possible after startup if none exists. However, this still leaves a short gap when the cluster is active with no ML metadata. To avoid problems, functions that use the ML metadata should treat this situation as equivalent to having empty ML metadata. relates elastic/x-pack-elasticsearch#1643 Original commit: elastic/x-pack-elasticsearch@8f0e00cda8
This commit is contained in:
parent
0d2b127fd7
commit
c2575288d8
|
@ -227,14 +227,19 @@ public class MlMetadata implements MetaData.Custom {
|
|||
private TreeMap<String, DatafeedConfig> datafeeds;
|
||||
|
||||
public Builder() {
|
||||
this.jobs = new TreeMap<>();
|
||||
this.datafeeds = new TreeMap<>();
|
||||
jobs = new TreeMap<>();
|
||||
datafeeds = new TreeMap<>();
|
||||
}
|
||||
|
||||
public Builder(MlMetadata previous) {
|
||||
public Builder(@Nullable MlMetadata previous) {
|
||||
if (previous == null) {
|
||||
jobs = new TreeMap<>();
|
||||
datafeeds = new TreeMap<>();
|
||||
} else {
|
||||
jobs = new TreeMap<>(previous.jobs);
|
||||
datafeeds = new TreeMap<>(previous.datafeeds);
|
||||
}
|
||||
}
|
||||
|
||||
public Builder putJob(Job job, boolean overwrite) {
|
||||
if (jobs.containsKey(job.getId()) && overwrite == false) {
|
||||
|
|
|
@ -199,6 +199,9 @@ public class GetDatafeedsAction extends Action<GetDatafeedsAction.Request, GetDa
|
|||
|
||||
QueryPage<DatafeedConfig> response;
|
||||
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
|
||||
if (mlMetadata == null) {
|
||||
mlMetadata = MlMetadata.EMPTY_METADATA;
|
||||
}
|
||||
if (ALL.equals(request.getDatafeedId())) {
|
||||
List<DatafeedConfig> datafeedConfigs = new ArrayList<>(mlMetadata.getDatafeeds().values());
|
||||
response = new QueryPage<>(datafeedConfigs, datafeedConfigs.size(), DatafeedConfig.RESULTS_FIELD);
|
||||
|
|
|
@ -305,6 +305,9 @@ public class GetDatafeedsStatsAction extends Action<GetDatafeedsStatsAction.Requ
|
|||
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
|
||||
|
||||
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
|
||||
if (mlMetadata == null) {
|
||||
mlMetadata = MlMetadata.EMPTY_METADATA;
|
||||
}
|
||||
|
||||
if (request.getDatafeedId().equals(ALL) == false
|
||||
&& mlMetadata.getDatafeed(request.getDatafeedId()) == null) {
|
||||
|
|
|
@ -370,9 +370,11 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
MlMetadata mlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
|
||||
MlMetadata clusterMlMetadata = clusterService.state().metaData().custom(MlMetadata.TYPE);
|
||||
MlMetadata mlMetadata = (clusterMlMetadata == null) ? MlMetadata.EMPTY_METADATA : clusterMlMetadata;
|
||||
|
||||
if (Job.ALL.equals(request.getJobId())) {
|
||||
request.expandedJobsIds = mlMetadata.getJobs().keySet().stream().collect(Collectors.toList());
|
||||
request.expandedJobsIds = new ArrayList<>(mlMetadata.getJobs().keySet());
|
||||
} else {
|
||||
if (mlMetadata.getJobs().containsKey(request.getJobId()) == false) {
|
||||
throw ExceptionsHelper.missingJobException(request.getJobId());
|
||||
|
|
|
@ -611,7 +611,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
|
|||
* </ul>
|
||||
*/
|
||||
static void validate(String jobId, MlMetadata mlMetadata) {
|
||||
Job job = mlMetadata.getJobs().get(jobId);
|
||||
Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId);
|
||||
if (job == null) {
|
||||
throw ExceptionsHelper.missingJobException(jobId);
|
||||
}
|
||||
|
|
|
@ -543,7 +543,7 @@ public class StartDatafeedAction
|
|||
}
|
||||
|
||||
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {
|
||||
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
|
||||
DatafeedConfig datafeed = (mlMetadata == null) ? null : mlMetadata.getDatafeed(datafeedId);
|
||||
if (datafeed == null) {
|
||||
throw ExceptionsHelper.missingDatafeedException(datafeedId);
|
||||
}
|
||||
|
|
|
@ -90,6 +90,9 @@ public class DatafeedManager extends AbstractComponent {
|
|||
String datafeedId = task.getDatafeedId();
|
||||
ClusterState state = clusterService.state();
|
||||
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
|
||||
if (mlMetadata == null) {
|
||||
mlMetadata = MlMetadata.EMPTY_METADATA;
|
||||
}
|
||||
|
||||
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
|
||||
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
|
||||
|
|
|
@ -31,7 +31,7 @@ public class DatafeedNodeSelector {
|
|||
private final IndexNameExpressionResolver resolver;
|
||||
|
||||
public DatafeedNodeSelector(ClusterState clusterState, IndexNameExpressionResolver resolver, String datafeedId) {
|
||||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
MlMetadata mlMetadata = Objects.requireNonNull(clusterState.metaData().custom(MlMetadata.TYPE));
|
||||
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
||||
this.datafeed = mlMetadata.getDatafeed(datafeedId);
|
||||
this.jobTask = MlMetadata.getJobTask(datafeed.getJobId(), tasks);
|
||||
|
|
|
@ -11,9 +11,7 @@ import org.elasticsearch.action.index.IndexResponse;
|
|||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.ack.AckedRequest;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -90,7 +88,7 @@ public class JobManager extends AbstractComponent {
|
|||
return getJobs(clusterState);
|
||||
}
|
||||
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
|
||||
Job job = mlMetadata.getJobs().get(jobId);
|
||||
Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId);
|
||||
if (job == null) {
|
||||
logger.debug(String.format(Locale.ROOT, "Cannot find job '%s'", jobId));
|
||||
throw ExceptionsHelper.missingJobException(jobId);
|
||||
|
@ -109,6 +107,9 @@ public class JobManager extends AbstractComponent {
|
|||
*/
|
||||
public QueryPage<Job> getJobs(ClusterState clusterState) {
|
||||
MlMetadata mlMetadata = clusterState.getMetaData().custom(MlMetadata.TYPE);
|
||||
if (mlMetadata == null) {
|
||||
mlMetadata = MlMetadata.EMPTY_METADATA;
|
||||
}
|
||||
List<Job> jobs = mlMetadata.getJobs().entrySet().stream()
|
||||
.map(Map.Entry::getValue)
|
||||
.collect(Collectors.toList());
|
||||
|
@ -150,7 +151,7 @@ public class JobManager extends AbstractComponent {
|
|||
*/
|
||||
public static Job getJobOrThrowIfUnknown(ClusterState clusterState, String jobId) {
|
||||
MlMetadata mlMetadata = clusterState.metaData().custom(MlMetadata.TYPE);
|
||||
Job job = mlMetadata.getJobs().get(jobId);
|
||||
Job job = (mlMetadata == null) ? null : mlMetadata.getJobs().get(jobId);
|
||||
if (job == null) {
|
||||
throw ExceptionsHelper.missingJobException(jobId);
|
||||
}
|
||||
|
@ -164,7 +165,7 @@ public class JobManager extends AbstractComponent {
|
|||
Job job = request.getJobBuilder().build(new Date());
|
||||
|
||||
MlMetadata currentMlMetadata = state.metaData().custom(MlMetadata.TYPE);
|
||||
if (currentMlMetadata.getJobs().containsKey(job.getId())) {
|
||||
if (currentMlMetadata != null && currentMlMetadata.getJobs().containsKey(job.getId())) {
|
||||
actionListener.onFailure(ExceptionsHelper.jobAlreadyExists(job.getId()));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue