Add jobid to job response (elastic/elasticsearch#434)
* Add jobId to job info responses * Remove getJobId() accessor from job - use getId() Original commit: elastic/x-pack-elasticsearch@faacef1217
This commit is contained in:
parent
f0a968292a
commit
2fdf848df5
|
@ -62,6 +62,12 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
public static final GetJobsAction INSTANCE = new GetJobsAction();
|
public static final GetJobsAction INSTANCE = new GetJobsAction();
|
||||||
public static final String NAME = "cluster:admin/prelert/jobs/get";
|
public static final String NAME = "cluster:admin/prelert/jobs/get";
|
||||||
|
|
||||||
|
private static final String CONFIG = "config";
|
||||||
|
private static final String DATA_COUNTS = "data_counts";
|
||||||
|
private static final String MODEL_SIZE_STATS = "model_size_stats";
|
||||||
|
private static final String SCHEDULER_STATE = "scheduler_state";
|
||||||
|
private static final String STATUS = "status";
|
||||||
|
|
||||||
private GetJobsAction() {
|
private GetJobsAction() {
|
||||||
super(NAME);
|
super(NAME);
|
||||||
}
|
}
|
||||||
|
@ -168,11 +174,11 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
if (stats.contains("_all")) {
|
if (stats.contains("_all")) {
|
||||||
all();
|
all();
|
||||||
} else {
|
} else {
|
||||||
config(stats.contains("config"));
|
config(stats.contains(CONFIG));
|
||||||
dataCounts(stats.contains("data_counts"));
|
dataCounts(stats.contains(DATA_COUNTS));
|
||||||
modelSizeStats(stats.contains("model_size_stats"));
|
modelSizeStats(stats.contains(MODEL_SIZE_STATS));
|
||||||
schedulerStatus(stats.contains("scheduler_state"));
|
schedulerStatus(stats.contains(SCHEDULER_STATE));
|
||||||
status(stats.contains("status"));
|
status(stats.contains(STATUS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,6 +254,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
public static class Response extends ActionResponse implements StatusToXContent {
|
public static class Response extends ActionResponse implements StatusToXContent {
|
||||||
|
|
||||||
public static class JobInfo implements ToXContent, Writeable {
|
public static class JobInfo implements ToXContent, Writeable {
|
||||||
|
private final String jobId;
|
||||||
@Nullable
|
@Nullable
|
||||||
private Job jobConfig;
|
private Job jobConfig;
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@ -261,8 +268,9 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
JobInfo(@Nullable Job job, @Nullable DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
|
JobInfo(String jobId, @Nullable Job job, @Nullable DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats,
|
||||||
@Nullable SchedulerState schedulerStatus, @Nullable JobStatus status) {
|
@Nullable SchedulerState schedulerStatus, @Nullable JobStatus status) {
|
||||||
|
this.jobId = jobId;
|
||||||
this.jobConfig = job;
|
this.jobConfig = job;
|
||||||
this.dataCounts = dataCounts;
|
this.dataCounts = dataCounts;
|
||||||
this.modelSizeStats = modelSizeStats;
|
this.modelSizeStats = modelSizeStats;
|
||||||
|
@ -271,6 +279,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
}
|
}
|
||||||
|
|
||||||
JobInfo(StreamInput in) throws IOException {
|
JobInfo(StreamInput in) throws IOException {
|
||||||
|
jobId = in.readString();
|
||||||
jobConfig = in.readOptionalWriteable(Job::new);
|
jobConfig = in.readOptionalWriteable(Job::new);
|
||||||
dataCounts = in.readOptionalWriteable(DataCounts::new);
|
dataCounts = in.readOptionalWriteable(DataCounts::new);
|
||||||
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
|
modelSizeStats = in.readOptionalWriteable(ModelSizeStats::new);
|
||||||
|
@ -278,6 +287,10 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
status = in.readOptionalWriteable(JobStatus::fromStream);
|
status = in.readOptionalWriteable(JobStatus::fromStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getJobid() {
|
||||||
|
return jobId;
|
||||||
|
}
|
||||||
|
|
||||||
public Job getJobConfig() {
|
public Job getJobConfig() {
|
||||||
return jobConfig;
|
return jobConfig;
|
||||||
}
|
}
|
||||||
|
@ -301,20 +314,21 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
|
builder.field(Job.ID.getPreferredName(), jobId);
|
||||||
if (jobConfig != null) {
|
if (jobConfig != null) {
|
||||||
builder.field("config", jobConfig);
|
builder.field(CONFIG, jobConfig);
|
||||||
}
|
}
|
||||||
if (dataCounts != null) {
|
if (dataCounts != null) {
|
||||||
builder.field("data_counts", dataCounts);
|
builder.field(DATA_COUNTS, dataCounts);
|
||||||
}
|
}
|
||||||
if (modelSizeStats != null) {
|
if (modelSizeStats != null) {
|
||||||
builder.field("model_size_stats", modelSizeStats);
|
builder.field(MODEL_SIZE_STATS, modelSizeStats);
|
||||||
}
|
}
|
||||||
if (schedulerState != null) {
|
if (schedulerState != null) {
|
||||||
builder.field("scheduler_state", schedulerState);
|
builder.field(SCHEDULER_STATE, schedulerState);
|
||||||
}
|
}
|
||||||
if (status != null) {
|
if (status != null) {
|
||||||
builder.field("status", status);
|
builder.field(STATUS, status);
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
|
|
||||||
|
@ -323,6 +337,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeString(jobId);
|
||||||
out.writeOptionalWriteable(jobConfig);
|
out.writeOptionalWriteable(jobConfig);
|
||||||
out.writeOptionalWriteable(dataCounts);
|
out.writeOptionalWriteable(dataCounts);
|
||||||
out.writeOptionalWriteable(modelSizeStats);
|
out.writeOptionalWriteable(modelSizeStats);
|
||||||
|
@ -332,7 +347,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(jobConfig, dataCounts, modelSizeStats, schedulerState, status);
|
return Objects.hash(jobId, jobConfig, dataCounts, modelSizeStats, schedulerState, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -344,7 +359,8 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
JobInfo other = (JobInfo) obj;
|
JobInfo other = (JobInfo) obj;
|
||||||
return Objects.equals(jobConfig, other.jobConfig)
|
return Objects.equals(jobId, other.jobId)
|
||||||
|
&& Objects.equals(jobConfig, other.jobConfig)
|
||||||
&& Objects.equals(this.dataCounts, other.dataCounts)
|
&& Objects.equals(this.dataCounts, other.dataCounts)
|
||||||
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
|
&& Objects.equals(this.modelSizeStats, other.modelSizeStats)
|
||||||
&& Objects.equals(this.schedulerState, other.schedulerState)
|
&& Objects.equals(this.schedulerState, other.schedulerState)
|
||||||
|
@ -474,7 +490,8 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), request.getJobId());
|
SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), request.getJobId());
|
||||||
JobStatus jobStatus = readJobStatus(request.status(), request.getJobId());
|
JobStatus jobStatus = readJobStatus(request.status(), request.getJobId());
|
||||||
|
|
||||||
Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats, schedulerStatus, jobStatus);
|
Response.JobInfo jobInfo = new Response.JobInfo(
|
||||||
|
request.getJobId(), jobConfig, dataCounts, modelSizeStats, schedulerStatus, jobStatus);
|
||||||
response = new QueryPage<>(Collections.singletonList(jobInfo), 1, Job.RESULTS_FIELD);
|
response = new QueryPage<>(Collections.singletonList(jobInfo), 1, Job.RESULTS_FIELD);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -483,11 +500,12 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
|
||||||
List<Response.JobInfo> jobInfoList = new ArrayList<>();
|
List<Response.JobInfo> jobInfoList = new ArrayList<>();
|
||||||
for (Job job : jobsPage.results()) {
|
for (Job job : jobsPage.results()) {
|
||||||
Job jobConfig = request.config() ? job : null;
|
Job jobConfig = request.config() ? job : null;
|
||||||
DataCounts dataCounts = readDataCounts(request.dataCounts(), job.getJobId());
|
DataCounts dataCounts = readDataCounts(request.dataCounts(), job.getId());
|
||||||
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), job.getJobId());
|
ModelSizeStats modelSizeStats = readModelSizeStats(request.modelSizeStats(), job.getId());
|
||||||
SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), job.getJobId());
|
SchedulerState schedulerStatus = readSchedulerState(request.schedulerStatus(), job.getId());
|
||||||
JobStatus jobStatus = readJobStatus(request.status(), job.getJobId());
|
JobStatus jobStatus = readJobStatus(request.status(), job.getId());
|
||||||
Response.JobInfo jobInfo = new Response.JobInfo(jobConfig, dataCounts, modelSizeStats, schedulerStatus, jobStatus);
|
Response.JobInfo jobInfo = new Response.JobInfo(job.getId(), jobConfig, dataCounts, modelSizeStats,
|
||||||
|
schedulerStatus, jobStatus);
|
||||||
jobInfoList.add(jobInfo);
|
jobInfoList.add(jobInfo);
|
||||||
}
|
}
|
||||||
response = new QueryPage<>(jobInfoList, jobsPage.count(), Job.RESULTS_FIELD);
|
response = new QueryPage<>(jobInfoList, jobsPage.count(), Job.RESULTS_FIELD);
|
||||||
|
|
|
@ -202,8 +202,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the Job Id. This name is preferred when serialising to the REST
|
* Return the Job Id.
|
||||||
* API.
|
|
||||||
*
|
*
|
||||||
* @return The job Id string
|
* @return The job Id string
|
||||||
*/
|
*/
|
||||||
|
@ -211,16 +210,6 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContent
|
||||||
return jobId;
|
return jobId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the Job Id. This name is preferred when serialising to the data
|
|
||||||
* store.
|
|
||||||
*
|
|
||||||
* @return The job Id string
|
|
||||||
*/
|
|
||||||
public String getJobId() {
|
|
||||||
return jobId;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The job description
|
* The job description
|
||||||
*
|
*
|
||||||
|
|
|
@ -131,8 +131,8 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
||||||
// A TP with no queue, so that we fail immediately if there are no threads available
|
// A TP with no queue, so that we fail immediately if there are no threads available
|
||||||
ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
|
ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
|
||||||
|
|
||||||
UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister);
|
UsageReporter usageReporter = new UsageReporter(settings, job.getId(), usagePersister);
|
||||||
StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getJobId(),
|
StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getId(),
|
||||||
jobProvider.dataCounts(jobId), usageReporter, jobDataCountsPersister);
|
jobProvider.dataCounts(jobId), usageReporter, jobDataCountsPersister);
|
||||||
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser);
|
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser);
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class AutodetectCommunicator implements Closeable {
|
||||||
|
|
||||||
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter,
|
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, StatusReporter statusReporter,
|
||||||
AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) {
|
AutoDetectResultProcessor autoDetectResultProcessor, StateProcessor stateProcessor) {
|
||||||
this.jobId = job.getJobId();
|
this.jobId = job.getId();
|
||||||
this.autodetectProcess = process;
|
this.autodetectProcess = process;
|
||||||
this.statusReporter = statusReporter;
|
this.statusReporter = statusReporter;
|
||||||
this.autoDetectResultProcessor = autoDetectResultProcessor;
|
this.autoDetectResultProcessor = autoDetectResultProcessor;
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
||||||
// until restore it is done before we can accept data.
|
// until restore it is done before we can accept data.
|
||||||
executorService.execute(() -> {
|
executorService.execute(() -> {
|
||||||
try (OutputStream r = processPipes.getRestoreStream().get()) {
|
try (OutputStream r = processPipes.getRestoreStream().get()) {
|
||||||
jobProvider.restoreStateToStream(job.getJobId(), modelSnapshot, r);
|
jobProvider.restoreStateToStream(job.getId(), modelSnapshot, r);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("Error restoring model state for job " + job.getId(), e);
|
LOGGER.error("Error restoring model state for job " + job.getId(), e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ public class ScheduledJobService extends AbstractComponent {
|
||||||
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
|
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
|
||||||
} catch (ScheduledJob.EmptyDataCountException e) {
|
} catch (ScheduledJob.EmptyDataCountException e) {
|
||||||
if (holder.problemTracker.updateEmptyDataCount(true)) {
|
if (holder.problemTracker.updateEmptyDataCount(true)) {
|
||||||
requestStopping(job.getJobId());
|
requestStopping(job.getId());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Failed lookback import for job[" + job.getId() + "]", e);
|
logger.error("Failed lookback import for job[" + job.getId() + "]", e);
|
||||||
|
@ -162,13 +162,13 @@ public class ScheduledJobService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
Holder createJobScheduler(Job job) {
|
Holder createJobScheduler(Job job) {
|
||||||
Auditor auditor = jobProvider.audit(job.getJobId());
|
Auditor auditor = jobProvider.audit(job.getId());
|
||||||
Duration frequency = getFrequencyOrDefault(job);
|
Duration frequency = getFrequencyOrDefault(job);
|
||||||
Duration queryDelay = Duration.ofSeconds(job.getSchedulerConfig().getQueryDelay());
|
Duration queryDelay = Duration.ofSeconds(job.getSchedulerConfig().getQueryDelay());
|
||||||
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(job);
|
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(job);
|
||||||
ScheduledJob scheduledJob = new ScheduledJob(job.getJobId(), frequency.toMillis(), queryDelay.toMillis(),
|
ScheduledJob scheduledJob = new ScheduledJob(job.getId(), frequency.toMillis(), queryDelay.toMillis(),
|
||||||
dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job),
|
dataExtractor, dataProcessor, auditor, currentTimeSupplier, getLatestFinalBucketEndTimeMs(job),
|
||||||
getLatestRecordTimestamp(job.getJobId()));
|
getLatestRecordTimestamp(job.getId()));
|
||||||
return new Holder(scheduledJob, new ProblemTracker(() -> auditor));
|
return new Holder(scheduledJob, new ProblemTracker(() -> auditor));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,7 @@ public class GetJobActionResponseTests extends AbstractStreamableTestCase<GetJob
|
||||||
jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
|
jobStatus = randomFrom(EnumSet.allOf(JobStatus.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
Response.JobInfo jobInfo = new Response.JobInfo(job, dataCounts, sizeStats, schedulerState, jobStatus);
|
Response.JobInfo jobInfo = new Response.JobInfo(jobId, job, dataCounts, sizeStats, schedulerState, jobStatus);
|
||||||
jobInfoList.add(jobInfo);
|
jobInfoList.add(jobInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class JobManagerTests extends ESTestCase {
|
||||||
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, builder.build())).build();
|
.metaData(MetaData.builder().putCustom(PrelertMetadata.TYPE, builder.build())).build();
|
||||||
QueryPage<Job> doc = jobManager.getJob("foo", clusterState);
|
QueryPage<Job> doc = jobManager.getJob("foo", clusterState);
|
||||||
assertTrue(doc.count() > 0);
|
assertTrue(doc.count() > 0);
|
||||||
assertThat(doc.results().get(0).getJobId(), equalTo("foo"));
|
assertThat(doc.results().get(0).getId(), equalTo("foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFilter() {
|
public void testFilter() {
|
||||||
|
|
Loading…
Reference in New Issue