[ML] Increase stop datafeed default timeout to 5 minutes (elastic/x-pack-elasticsearch#992)

Increase the timeout to give enough time for a datafeed to
stop smoothly.

This is the second step to avoid hitting the default timeout.
The first was ensuring aggregated datafeed is cancellable in
a responsive manner. The third and final step will be to
apply chunking in aggregated datafeeds in order to shorten
the duration of the search, which will make cancellation even
more responsive.

Relates elastic/x-pack-elasticsearch#803

Original commit: elastic/x-pack-elasticsearch@db642330ec
This commit is contained in:
Dimitris Athanasiou 2017-04-08 14:44:08 +01:00 committed by GitHub
parent 734243a4df
commit 5c9364d5fe
6 changed files with 33 additions and 16 deletions

View File

@ -331,7 +331,7 @@ public class StartDatafeedAction
}
public void stop(String reason) {
stop(reason, TimeValue.timeValueSeconds(20));
stop(reason, StopDatafeedAction.DEFAULT_TIMEOUT);
}
public void stop(String reason, TimeValue timeout) {

View File

@ -54,6 +54,7 @@ public class StopDatafeedAction
public static final String NAME = "cluster:admin/xpack/ml/datafeeds/stop";
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField FORCE = new ParseField("force");
public static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueMinutes(5);
private StopDatafeedAction() {
super(NAME);
@ -76,7 +77,7 @@ public class StopDatafeedAction
static {
PARSER.declareString((request, datafeedId) -> request.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareString((request, val) ->
request.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
request.setStopTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareBoolean(Request::setForce, FORCE);
}
@ -93,12 +94,12 @@ public class StopDatafeedAction
}
private String datafeedId;
private TimeValue stopTimeout = DEFAULT_TIMEOUT;
private boolean force = false;
public Request(String jobId) {
this.datafeedId = ExceptionsHelper.requireNonNull(jobId, DatafeedConfig.ID.getPreferredName());
setActions(StartDatafeedAction.NAME);
setTimeout(TimeValue.timeValueSeconds(20));
}
Request() {
@ -108,6 +109,14 @@ public class StopDatafeedAction
return datafeedId;
}
public TimeValue getStopTimeout() {
return stopTimeout;
}
public void setStopTimeout(TimeValue stopTimeout) {
this.stopTimeout = ExceptionsHelper.requireNonNull(stopTimeout, TIMEOUT.getPreferredName());
}
public boolean isForce() {
return force;
}
@ -131,6 +140,7 @@ public class StopDatafeedAction
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
stopTimeout = new TimeValue(in);
force = in.readBoolean();
}
@ -138,21 +148,21 @@ public class StopDatafeedAction
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
stopTimeout.writeTo(out);
out.writeBoolean(force);
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, getTimeout());
return Objects.hash(datafeedId, stopTimeout, force);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
if (getTimeout() != null) {
builder.field(TIMEOUT.getPreferredName(), getTimeout().getStringRep());
}
builder.field(TIMEOUT.getPreferredName(), stopTimeout.getStringRep());
builder.field(FORCE.getPreferredName(), force);
builder.endObject();
return builder;
}
@ -167,7 +177,7 @@ public class StopDatafeedAction
}
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(getTimeout(), other.getTimeout()) &&
Objects.equals(stopTimeout, other.stopTimeout) &&
Objects.equals(force, other.force);
}
}
@ -254,7 +264,7 @@ public class StopDatafeedAction
// This api returns when task has been cancelled, but that doesn't mean the persistent task has been removed from cluster state,
// so wait for that to happen here.
void waitForDatafeedStopped(long persistentTaskId, Request request, Response response, ActionListener<Response> listener) {
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getTimeout(),
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, Objects::isNull, request.getStopTimeout(),
new WaitForPersistentTaskStatusListener<StartDatafeedAction.Request>() {
@Override
public void onResponse(PersistentTask<StartDatafeedAction.Request> task) {
@ -295,7 +305,7 @@ public class StopDatafeedAction
@Override
protected void taskOperation(Request request, StartDatafeedAction.DatafeedTask task, ActionListener<Response> listener) {
task.stop("stop_datafeed (api)", request.getTimeout());
task.stop("stop_datafeed (api)", request.getStopTimeout());
listener.onResponse(new Response(true));
}

View File

@ -193,7 +193,12 @@ class DatafeedJob {
throw new EmptyDataCountException();
}
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
// If the datafeed was stopped, then it is possible that by the time
// we call flush the job is closed. Thus, we don't flush unless the
// datafeed is stilll running.
if (isRunning()) {
client.execute(FlushJobAction.INSTANCE, flushRequest).actionGet();
}
}
private DataCounts postData(InputStream inputStream, XContentType xContentType)

View File

@ -319,7 +319,8 @@ public class DatafeedManager extends AbstractComponent {
FutureUtils.cancel(future);
auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
handler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped", source, datafeed.getId(), datafeed.getJobId());
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeed.getId(), datafeed.getJobId(),
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");
if (autoCloseJob) {
closeJob();
}

View File

@ -42,9 +42,9 @@ public class RestStopDatafeedAction extends BaseRestHandler {
} else {
jobDatafeedRequest = new StopDatafeedAction.Request(datafeedId);
if (restRequest.hasParam(StopDatafeedAction.TIMEOUT.getPreferredName())) {
TimeValue openTimeout = restRequest.paramAsTime(
StopDatafeedAction.TIMEOUT.getPreferredName(), TimeValue.timeValueSeconds(20));
jobDatafeedRequest.setTimeout(openTimeout);
TimeValue stopTimeout = restRequest.paramAsTime(
StopDatafeedAction.TIMEOUT.getPreferredName(), StopDatafeedAction.DEFAULT_TIMEOUT);
jobDatafeedRequest.setStopTimeout(stopTimeout);
}
if (restRequest.hasParam(StopDatafeedAction.FORCE.getPreferredName())) {
jobDatafeedRequest.setForce(

View File

@ -32,8 +32,9 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLengthBetween(1, 20));
if (randomBoolean()) {
request.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
request.setStopTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
request.setForce(randomBoolean());
return request;
}