When timeout has been reached, check one more time if the job / datafeed status has the expected value.

Decreased wait timeout from 30s to 20s

Original commit: elastic/x-pack-elasticsearch@b46fb0abe3
This commit is contained in:
Martijn van Groningen 2017-01-25 23:32:04 +01:00
parent 86291c12e2
commit 3a36f94a4a
8 changed files with 39 additions and 14 deletions

View File

@ -65,7 +65,7 @@ public class CloseJobAction extends Action<CloseJobAction.Request, CloseJobActio
public static class Request extends ActionRequest {
private String jobId;
private TimeValue closeTimeout = TimeValue.timeValueMinutes(30);
private TimeValue closeTimeout = TimeValue.timeValueMinutes(20);
Request() {}

View File

@ -58,7 +58,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
private String jobId;
private boolean ignoreDowntime;
private TimeValue openTimeout = TimeValue.timeValueSeconds(30);
private TimeValue openTimeout = TimeValue.timeValueSeconds(20);
public Request(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());

View File

@ -87,7 +87,7 @@ public class StartDatafeedAction
private String datafeedId;
private long startTime;
private Long endTime;
private TimeValue startTimeout = TimeValue.timeValueSeconds(30);
private TimeValue startTimeout = TimeValue.timeValueSeconds(20);
public Request(String datafeedId, long startTime) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
@ -258,7 +258,6 @@ public class StartDatafeedAction
} else {
listener.onResponse(new Response(true));
}
});
}
}

View File

@ -67,7 +67,7 @@ public class StopDatafeedAction
public static class Request extends ActionRequest {
private String datafeedId;
private TimeValue stopTimeout = TimeValue.timeValueSeconds(30);
private TimeValue stopTimeout = TimeValue.timeValueSeconds(20);
public Request(String jobId) {
this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName());

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.Objects;
@ -91,4 +92,19 @@ public class Datafeed extends AbstractDiffable<Datafeed> implements ToXContent {
public int hashCode() {
return Objects.hash(config, status);
}
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@SuppressWarnings("deprecation")
@Override
public final String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.prettyPrint();
toXContent(builder, EMPTY_PARAMS);
return builder.string();
} catch (Exception e) {
// So we have a stack trace logged somewhere
return "{ \"error\" : \"" + org.elasticsearch.ExceptionsHelper.detailedMessage(e) + "\"}";
}
}
}

View File

@ -126,7 +126,7 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
return Objects.hash(nodeId, jobId, ignoreDowntime, status, statusReason);
}
// Class alreadt extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
// Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@SuppressWarnings("deprecation")
@Override
public final String toString() {

View File

@ -34,6 +34,7 @@ public class DatafeedStatusObserver {
public void waitForStatus(String datafeedId, TimeValue waitTimeout, DatafeedStatus expectedStatus, Consumer<Exception> handler) {
ClusterStateObserver observer =
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
DatafeedPredicate datafeedPredicate = new DatafeedPredicate(datafeedId, expectedStatus);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
@ -49,11 +50,15 @@ public class DatafeedStatusObserver {
@Override
public void onTimeout(TimeValue timeout) {
if (datafeedPredicate.test(clusterService.state())) {
handler.accept(null);
} else {
Exception e = new IllegalArgumentException("Timeout expired while waiting for datafeed status to change to ["
+ expectedStatus + "]");
handler.accept(e);
}
}, new DatafeedPredicate(datafeedId, expectedStatus), waitTimeout);
}
}, datafeedPredicate, waitTimeout);
}
private static class DatafeedPredicate implements Predicate<ClusterState> {

View File

@ -34,6 +34,7 @@ public class JobStatusObserver {
public void waitForStatus(String jobId, TimeValue waitTimeout, JobStatus expectedStatus, Consumer<Exception> handler) {
ClusterStateObserver observer =
new ClusterStateObserver(clusterService, LOGGER, threadPool.getThreadContext());
JobStatusPredicate jobStatusPredicate = new JobStatusPredicate(jobId, expectedStatus);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
@ -49,11 +50,15 @@ public class JobStatusObserver {
@Override
public void onTimeout(TimeValue timeout) {
if (jobStatusPredicate.test(clusterService.state())) {
handler.accept(null);
} else {
Exception e = new IllegalArgumentException("Timeout expired while waiting for job status to change to ["
+ expectedStatus + "]");
handler.accept(e);
}
}, new JobStatusPredicate(jobId, expectedStatus), waitTimeout);
}
}, jobStatusPredicate, waitTimeout);
}
private static class JobStatusPredicate implements Predicate<ClusterState> {