wait for the allocation to be added before sending data

Original commit: elastic/x-pack-elasticsearch@504a75a2f2
This commit is contained in:
Martijn van Groningen 2016-11-24 11:49:44 +01:00
parent 02c755bfbe
commit aa53e7177a
2 changed files with 40 additions and 2 deletions

View File

@ -245,7 +245,7 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
public static class Response extends ActionResponse implements StatusToXContent {
static class JobInfo implements ToXContent, Writeable {
public static class JobInfo implements ToXContent, Writeable {
@Nullable
private Job jobConfig;
@Nullable
@ -276,6 +276,26 @@ public class GetJobsAction extends Action<GetJobsAction.Request, GetJobsAction.R
status = in.readOptionalWriteable(JobStatus::fromStream);
}
public Job getJobConfig() {
return jobConfig;
}
public DataCounts getDataCounts() {
return dataCounts;
}
public ModelSizeStats getModelSizeStats() {
return modelSizeStats;
}
public SchedulerState getSchedulerState() {
return schedulerState;
}
public JobStatus getStatus() {
return status;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -13,6 +13,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.GetJobsAction;
import org.elasticsearch.xpack.prelert.action.PostDataAction;
import org.elasticsearch.xpack.prelert.action.PutJobAction;
import org.elasticsearch.xpack.prelert.action.ScheduledJobsIT;
@ -20,10 +21,12 @@ import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
import org.elasticsearch.xpack.prelert.job.DataDescription;
import org.elasticsearch.xpack.prelert.job.Detector;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.junit.After;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
public class TooManyJobsIT extends ESIntegTestCase {
@ -51,6 +54,19 @@ public class TooManyJobsIT extends ESIntegTestCase {
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true));
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
assertBusy(() -> {
try {
GetJobsAction.Request getJobRequest = new GetJobsAction.Request();
getJobRequest.setJobId(job.getId());
getJobRequest.status(true);
GetJobsAction.Response response = client().execute(GetJobsAction.INSTANCE, getJobRequest).get();
GetJobsAction.Response.JobInfo jobInfo = response.getResponse().results().get(0);
assertNotNull(jobInfo);
assertEquals(JobStatus.CLOSED, jobInfo.getStatus());
} catch (Exception e) {
fail("failure " + e.getMessage());
}
});
// triggers creating autodetect process:
PostDataAction.Request postDataRequest = new PostDataAction.Request(job.getId());
@ -60,8 +76,10 @@ public class TooManyJobsIT extends ESIntegTestCase {
assertEquals(1, postDataResponse.getDataCounts().getInputRecordCount());
logger.info("Posted data {} times", i);
} catch (Exception e) {
logger.info("Handling exception", e);
Throwable cause = ExceptionsHelper.unwrapCause(e.getCause());
if (ElasticsearchStatusException.class.equals(cause.getClass()) == false) {
logger.warn("Unexpected cause", e);
}
assertEquals(ElasticsearchStatusException.class, cause.getClass());
assertEquals(RestStatus.TOO_MANY_REQUESTS, ((ElasticsearchStatusException) cause).status());
logger.info("good news everybody --> reached threadpool capacity after starting {}th analytical process", i);