Remove the ignoreDowntime parameter from the _data endpoint (elastic/elasticsearch#834)

The parameter only applies when a job is opened

Original commit: elastic/x-pack-elasticsearch@37b902aa2a
This commit is contained in:
David Kyle 2017-01-31 11:48:58 +00:00 committed by GitHub
parent 34274a30ed
commit 97970b94cd
11 changed files with 19 additions and 40 deletions

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -56,6 +57,8 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
public static class Request extends ActionRequest {
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
private String jobId;
private boolean ignoreDowntime;
private TimeValue openTimeout = TimeValue.timeValueSeconds(20);

View File

@ -130,11 +130,9 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
public static class Request extends TransportJobTaskAction.JobTaskRequest<Request> {
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");
public static final ParseField RESET_START = new ParseField("reset_start");
public static final ParseField RESET_END = new ParseField("reset_end");
private boolean ignoreDowntime = false;
private String resetStart = "";
private String resetEnd = "";
private DataDescription dataDescription;
@ -151,14 +149,6 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
return jobId;
}
public boolean isIgnoreDowntime() {
return ignoreDowntime;
}
public void setIgnoreDowntime(boolean ignoreDowntime) {
this.ignoreDowntime = ignoreDowntime;
}
public String getResetStart() {
return resetStart;
}
@ -192,7 +182,6 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ignoreDowntime = in.readBoolean();
resetStart = in.readOptionalString();
resetEnd = in.readOptionalString();
dataDescription = in.readOptionalWriteable(DataDescription::new);
@ -202,7 +191,6 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(ignoreDowntime);
out.writeOptionalString(resetStart);
out.writeOptionalString(resetEnd);
out.writeOptionalWriteable(dataDescription);
@ -212,7 +200,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
@Override
public int hashCode() {
// content stream not included
return Objects.hash(jobId, ignoreDowntime, resetStart, resetEnd, dataDescription);
return Objects.hash(jobId, resetStart, resetEnd, dataDescription);
}
@Override
@ -227,7 +215,6 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
// content stream not included
return Objects.equals(jobId, other.jobId) &&
Objects.equals(ignoreDowntime, other.ignoreDowntime) &&
Objects.equals(resetStart, other.resetStart) &&
Objects.equals(resetEnd, other.resetEnd) &&
Objects.equals(dataDescription, other.dataDescription);
@ -255,8 +242,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
@Override
protected void taskOperation(Request request, InternalOpenJobAction.JobTask task, ActionListener<Response> listener) {
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime(),
Optional.ofNullable(request.getDataDescription()));
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
threadPool.executor(MlPlugin.THREAD_POOL_NAME).execute(() -> {
try {
DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params);

View File

@ -12,12 +12,10 @@ import java.util.Optional;
public class DataLoadParams {
private final TimeRange resetTimeRange;
private final boolean ignoreDowntime;
private final Optional<DataDescription> dataDescription;
public DataLoadParams(TimeRange resetTimeRange, boolean ignoreDowntime, Optional<DataDescription> dataDescription) {
public DataLoadParams(TimeRange resetTimeRange, Optional<DataDescription> dataDescription) {
this.resetTimeRange = Objects.requireNonNull(resetTimeRange);
this.ignoreDowntime = ignoreDowntime;
this.dataDescription = Objects.requireNonNull(dataDescription);
}
@ -33,10 +31,6 @@ public class DataLoadParams {
return resetTimeRange.getEnd();
}
public boolean isIgnoreDowntime() {
return ignoreDowntime;
}
public Optional<DataDescription> getDataDescription() {
return dataDescription;
}

View File

@ -30,7 +30,7 @@ public class RestOpenJobAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
OpenJobAction.Request request = new OpenJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
request.setIgnoreDowntime(restRequest.paramAsBoolean(PostDataAction.Request.IGNORE_DOWNTIME.getPreferredName(), false));
request.setIgnoreDowntime(restRequest.paramAsBoolean(OpenJobAction.Request.IGNORE_DOWNTIME.getPreferredName(), false));
if (restRequest.hasParam("open_timeout")) {
TimeValue openTimeout = restRequest.paramAsTime("open_timeout", TimeValue.timeValueSeconds(30));
request.setOpenTimeout(openTimeout);

View File

@ -19,7 +19,6 @@ import java.io.IOException;
public class RestPostDataAction extends BaseRestHandler {
private static final boolean DEFAULT_IGNORE_DOWNTIME = false;
private static final String DEFAULT_RESET_START = "";
private static final String DEFAULT_RESET_END = "";
@ -32,8 +31,6 @@ public class RestPostDataAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
PostDataAction.Request request = new PostDataAction.Request(restRequest.param(Job.ID.getPreferredName()));
request.setIgnoreDowntime(
restRequest.paramAsBoolean(PostDataAction.Request.IGNORE_DOWNTIME.getPreferredName(), DEFAULT_IGNORE_DOWNTIME));
request.setResetStart(restRequest.param(PostDataAction.Request.RESET_START.getPreferredName(), DEFAULT_RESET_START));
request.setResetEnd(restRequest.param(PostDataAction.Request.RESET_END.getPreferredName(), DEFAULT_RESET_END));
request.setContent(restRequest.content());

View File

@ -11,7 +11,6 @@ public class PostDataActionRequestTests extends AbstractStreamableTestCase<PostD
@Override
protected PostDataAction.Request createTestInstance() {
PostDataAction.Request request = new PostDataAction.Request(randomAsciiOfLengthBetween(1, 20));
request.setIgnoreDowntime(randomBoolean());
if (randomBoolean()) {
request.setResetStart(randomAsciiOfLengthBetween(1, 20));
}

View File

@ -40,7 +40,7 @@ import static org.mockito.Mockito.when;
public class AutodetectCommunicatorTests extends ESTestCase {
public void testWriteResetBucketsControlMessage() throws IOException {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), false, Optional.empty());
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build(), Optional.empty());
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params);
@ -148,7 +148,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
() -> communicator.writeToJob(in, mock(DataLoadParams.class)));
communicator.inUse.set(null);
communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()));
communicator.writeToJob(in, new DataLoadParams(TimeRange.builder().build(), Optional.empty()));
}
public void testFlushInUse() throws IOException {

View File

@ -174,7 +174,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfOpenJobs());
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), false, Optional.empty());
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
manager.openJob("foo", false, e -> {});
manager.processData("foo", createInputStream(""), params);
assertEquals(1, manager.numberOfOpenJobs());
@ -212,7 +212,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true, Optional.empty());
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), Optional.empty());
InputStream inputStream = createInputStream("");
manager.openJob("foo", false, e -> {});
manager.processData("foo", inputStream, params);

View File

@ -106,7 +106,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true, Optional.empty());
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), Optional.empty());
process.writeResetBucketsControlMessage(params);
process.flushStream();

View File

@ -11,17 +11,17 @@ import java.util.Optional;
public class DataLoadParamsTests extends ESTestCase {
public void testGetStart() {
assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getStart());
assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build(), false, Optional.empty()).getStart());
assertEquals("", new DataLoadParams(TimeRange.builder().build(), Optional.empty()).getStart());
assertEquals("3", new DataLoadParams(TimeRange.builder().startTime("3").build(), Optional.empty()).getStart());
}
public void testGetEnd() {
assertEquals("", new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).getEnd());
assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build(), false, Optional.empty()).getEnd());
assertEquals("", new DataLoadParams(TimeRange.builder().build(), Optional.empty()).getEnd());
assertEquals("1", new DataLoadParams(TimeRange.builder().endTime("1").build(), Optional.empty()).getEnd());
}
public void testIsResettingBuckets() {
assertFalse(new DataLoadParams(TimeRange.builder().build(), false, Optional.empty()).isResettingBuckets());
assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build(), false, Optional.empty()).isResettingBuckets());
assertFalse(new DataLoadParams(TimeRange.builder().build(), Optional.empty()).isResettingBuckets());
assertTrue(new DataLoadParams(TimeRange.builder().startTime("5").build(), Optional.empty()).isResettingBuckets());
}
}

View File

@ -129,7 +129,7 @@ public class ControlMsgToProcessWriterTests extends ESTestCase {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(lengthEncodedWriter, 2);
writer.writeResetBucketsMessage(
new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build(), false, Optional.empty()));
new DataLoadParams(TimeRange.builder().startTime("0").endTime("600").build(), Optional.empty()));
InOrder inOrder = inOrder(lengthEncodedWriter);
inOrder.verify(lengthEncodedWriter).writeNumFields(4);