mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-06 04:58:50 +00:00
get watch api: Return the watch status separate from the watch source.
The status isn't maintained by the user but rather by Watcher itself. The idea here is that the get watch api should return the watch as was provided to Watcher via the put watch api. The status will be reported under the top level `_status` field. Original commit: elastic/x-pack-elasticsearch@54e2452493
This commit is contained in:
parent
67fdad6357
commit
ba17333523
@ -43,3 +43,5 @@
|
||||
id: "my_watch"
|
||||
- match: { found : true}
|
||||
- match: { _id: "my_watch" }
|
||||
- match: { _status.version: 1 }
|
||||
- is_false: watch.status
|
||||
|
@ -8,14 +8,15 @@ package org.elasticsearch.watcher.rest.action;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestBuilderListener;
|
||||
import org.elasticsearch.watcher.client.WatcherClient;
|
||||
import org.elasticsearch.watcher.rest.WatcherRestHandler;
|
||||
import org.elasticsearch.watcher.support.xcontent.WatcherParams;
|
||||
import org.elasticsearch.watcher.transport.actions.get.GetWatchRequest;
|
||||
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
|
||||
import org.elasticsearch.watcher.watch.WatchStatus;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.GET;
|
||||
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
|
||||
@ -30,17 +31,20 @@ public class RestGetWatchAction extends WatcherRestHandler {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleRequest(RestRequest request, RestChannel channel, WatcherClient client) throws Exception {
|
||||
protected void handleRequest(final RestRequest request, RestChannel channel, WatcherClient client) throws Exception {
|
||||
final GetWatchRequest getWatchRequest = new GetWatchRequest(request.param("id"));
|
||||
client.getWatch(getWatchRequest, new RestBuilderListener<GetWatchResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(GetWatchResponse response, XContentBuilder builder) throws Exception {
|
||||
builder.startObject()
|
||||
.field("found", response.isFound())
|
||||
.field("_id", response.getId())
|
||||
.field("_version", response.getVersion());
|
||||
.field("_id", response.getId());
|
||||
if (response.isFound()) {
|
||||
builder.field("watch", response.getSource(), ToXContent.EMPTY_PARAMS);
|
||||
WatcherParams params = WatcherParams.builder(request)
|
||||
.put(WatchStatus.INCLUDE_VERSION_KEY, true)
|
||||
.build();
|
||||
builder.field("_status", response.getStatus(), params);
|
||||
builder.field("watch", response.getSource(), params);
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
|
@ -85,6 +85,11 @@ public class WatcherParams extends ToXContent.DelegatingMapParams {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder put(String key, Object value) {
|
||||
params.put(key, String.valueOf(value));
|
||||
return this;
|
||||
}
|
||||
|
||||
public WatcherParams build() {
|
||||
return new WatcherParams(params.build(), delegate);
|
||||
}
|
||||
|
@ -11,13 +11,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.watcher.watch.WatchStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class GetWatchResponse extends ActionResponse {
|
||||
|
||||
private String id;
|
||||
private long version = -1;
|
||||
private WatchStatus status;
|
||||
private boolean found = false;
|
||||
private XContentSource source;
|
||||
|
||||
@ -29,7 +30,6 @@ public class GetWatchResponse extends ActionResponse {
|
||||
*/
|
||||
public GetWatchResponse(String id) {
|
||||
this.id = id;
|
||||
this.version = -1;
|
||||
this.found = false;
|
||||
this.source = null;
|
||||
}
|
||||
@ -37,9 +37,9 @@ public class GetWatchResponse extends ActionResponse {
|
||||
/**
|
||||
* ctor for found watch
|
||||
*/
|
||||
public GetWatchResponse(String id, long version, BytesReference source, XContentType contentType) {
|
||||
public GetWatchResponse(String id, WatchStatus status, BytesReference source, XContentType contentType) {
|
||||
this.id = id;
|
||||
this.version = version;
|
||||
this.status = status;
|
||||
this.found = true;
|
||||
this.source = new XContentSource(source, contentType);
|
||||
}
|
||||
@ -48,11 +48,10 @@ public class GetWatchResponse extends ActionResponse {
|
||||
return id;
|
||||
}
|
||||
|
||||
public long getVersion() {
|
||||
return version;
|
||||
public WatchStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
public boolean isFound() {
|
||||
return found;
|
||||
}
|
||||
@ -66,8 +65,10 @@ public class GetWatchResponse extends ActionResponse {
|
||||
super.readFrom(in);
|
||||
id = in.readString();
|
||||
found = in.readBoolean();
|
||||
version = in.readLong();
|
||||
source = found ? XContentSource.readFrom(in) : null;
|
||||
if (found) {
|
||||
status = WatchStatus.read(in);
|
||||
source = XContentSource.readFrom(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -75,8 +76,8 @@ public class GetWatchResponse extends ActionResponse {
|
||||
super.writeTo(out);
|
||||
out.writeString(id);
|
||||
out.writeBoolean(found);
|
||||
out.writeLong(version);
|
||||
if (found) {
|
||||
status.writeTo(out);
|
||||
XContentSource.writeTo(source, out);
|
||||
}
|
||||
}
|
||||
|
@ -64,9 +64,12 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
|
||||
}
|
||||
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
// When we return the watch via the get api, we want to return the watch as was specified in the put api,
|
||||
// we don't include the status in the watch source itself, but as a separate top level field, so that
|
||||
// it indicates the the status is managed by watcher itself.
|
||||
watch.toXContent(builder, WatcherParams.builder().hideSecrets(true).build());
|
||||
BytesReference watchSource = builder.bytes();
|
||||
listener.onResponse(new GetWatchResponse(watch.id(), watch.status().version(), watchSource, XContentType.JSON));
|
||||
listener.onResponse(new GetWatchResponse(watch.id(), watch.status(), watchSource, XContentType.JSON));
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.watcher.support.xcontent.WatcherParams;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.joda.time.PeriodType;
|
||||
@ -52,6 +53,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
public class Watch implements TriggerEngine.Job, ToXContent {
|
||||
|
||||
public static final String ALL_ACTIONS_ID = "_all";
|
||||
public static final String INCLUDE_STATUS_KEY = "include_status";
|
||||
|
||||
private final String id;
|
||||
private final Trigger trigger;
|
||||
@ -174,7 +176,9 @@ public class Watch implements TriggerEngine.Job, ToXContent {
|
||||
if (metadata != null) {
|
||||
builder.field(Field.METADATA.getPreferredName(), metadata);
|
||||
}
|
||||
builder.field(Field.STATUS.getPreferredName(), status, params);
|
||||
if (params.paramAsBoolean(INCLUDE_STATUS_KEY, false)) {
|
||||
builder.field(Field.STATUS.getPreferredName(), status, params);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -183,7 +187,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
|
||||
// we don't want to cache this and instead rebuild it every time on demand. The watch is in
|
||||
// memory and we don't need this redundancy
|
||||
try {
|
||||
return toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS).bytes();
|
||||
return toXContent(jsonBuilder(), WatcherParams.builder().put(Watch.INCLUDE_STATUS_KEY, true).build()).bytes();
|
||||
} catch (IOException ioe) {
|
||||
throw new WatcherException("could not serialize watch [{}]", ioe, id);
|
||||
}
|
||||
|
@ -32,6 +32,8 @@ import static org.elasticsearch.watcher.support.WatcherDateTimeUtils.*;
|
||||
*/
|
||||
public class WatchStatus implements ToXContent, Streamable {
|
||||
|
||||
public static final String INCLUDE_VERSION_KEY = "include_version";
|
||||
|
||||
private transient long version;
|
||||
|
||||
private @Nullable DateTime lastChecked;
|
||||
@ -211,6 +213,9 @@ public class WatchStatus implements ToXContent, Streamable {
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (params.paramAsBoolean(INCLUDE_VERSION_KEY, false)) {
|
||||
builder.field(Field.VERSION.getPreferredName(), version);
|
||||
}
|
||||
if (lastChecked != null) {
|
||||
builder.field(Field.LAST_CHECKED.getPreferredName(), lastChecked);
|
||||
}
|
||||
@ -272,6 +277,7 @@ public class WatchStatus implements ToXContent, Streamable {
|
||||
|
||||
|
||||
interface Field {
|
||||
ParseField VERSION = new ParseField("version");
|
||||
ParseField LAST_CHECKED = new ParseField("last_checked");
|
||||
ParseField LAST_MET_CONDITION = new ParseField("last_met_condition");
|
||||
ParseField ACTIONS = new ParseField("actions");
|
||||
|
@ -6,6 +6,7 @@
|
||||
package org.elasticsearch.watcher.execution;
|
||||
|
||||
import org.elasticsearch.watcher.support.clock.SystemClock;
|
||||
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
|
||||
import org.joda.time.DateTime;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
@ -136,10 +137,10 @@ public class ManualExecutionTests extends AbstractWatcherIntegrationTests {
|
||||
Watch testWatch = watchService().getWatch("_id");
|
||||
if (recordExecution) {
|
||||
refresh();
|
||||
Watch persistedWatch = watchParser().parse("_id", true, watcherClient().getWatch(new GetWatchRequest("_id")).actionGet().getSource().getBytes());
|
||||
if (ignoreCondition || conditionAlwaysTrue) {
|
||||
assertThat(testWatch.status().actionStatus("log").ackStatus().state(), equalTo(ActionStatus.AckStatus.State.ACKABLE));
|
||||
assertThat(persistedWatch.status().actionStatus("log").ackStatus().state(), equalTo(ActionStatus.AckStatus.State.ACKABLE));
|
||||
GetWatchResponse response = watcherClient().getWatch(new GetWatchRequest("_id")).actionGet();
|
||||
assertThat(response.getStatus().actionStatus("log").ackStatus().state(), equalTo(ActionStatus.AckStatus.State.ACKABLE));
|
||||
} else {
|
||||
assertThat(testWatch.status().actionStatus("log").ackStatus().state(), equalTo(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
|
||||
}
|
||||
|
@ -23,8 +23,8 @@ import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.watcher.WatcherVersion;
|
||||
import org.elasticsearch.watcher.actions.ActionStatus;
|
||||
import org.elasticsearch.watcher.history.HistoryStore;
|
||||
import org.elasticsearch.watcher.support.xcontent.XContentSource;
|
||||
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
|
||||
import org.elasticsearch.watcher.transport.actions.get.GetWatchResponse;
|
||||
import org.elasticsearch.watcher.transport.actions.put.PutWatchResponse;
|
||||
import org.elasticsearch.watcher.transport.actions.service.WatcherServiceResponse;
|
||||
import org.junit.Test;
|
||||
@ -246,8 +246,8 @@ public class LicenseIntegrationTests extends AbstractWatcherIntegrationTests {
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
XContentSource source = watcherClient().prepareGetWatch(watchName).get().getSource();
|
||||
assertThat(source.getValue("status.actions._index.ack.state"), is((Object) "ackable"));
|
||||
GetWatchResponse response = watcherClient().prepareGetWatch(watchName).get();
|
||||
assertThat(response.getStatus().actionStatus("_index").ackStatus().state(), equalTo(ActionStatus.AckStatus.State.ACKABLE));
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -257,13 +257,12 @@ public class WatchAckTests extends AbstractWatcherIntegrationTests {
|
||||
}
|
||||
|
||||
GetWatchResponse watchResponse = watcherClient.getWatch(new GetWatchRequest("_name")).actionGet();
|
||||
Watch watch = watchParser().parse("_name", true, watchResponse.getSource().getBytes());
|
||||
assertThat(watch.status().actionStatus("_id").ackStatus().state(), Matchers.equalTo(ActionStatus.AckStatus.State.ACKED));
|
||||
assertThat(watchResponse.getStatus().actionStatus("_id").ackStatus().state(), Matchers.equalTo(ActionStatus.AckStatus.State.ACKED));
|
||||
|
||||
refresh();
|
||||
GetResponse getResponse = client().get(new GetRequest(WatchStore.INDEX, WatchStore.DOC_TYPE, "_name")).actionGet();
|
||||
Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef());
|
||||
assertThat(watch.status().actionStatus("_id").ackStatus().state(), Matchers.equalTo(indexedWatch.status().actionStatus("_id").ackStatus().state()));
|
||||
assertThat(watchResponse.getStatus().actionStatus("_id").ackStatus().state(), Matchers.equalTo(indexedWatch.status().actionStatus("_id").ackStatus().state()));
|
||||
|
||||
if (timeWarped()) {
|
||||
timeWarp().scheduler().trigger("_name", 4, TimeValue.timeValueSeconds(5));
|
||||
|
@ -31,7 +31,6 @@ public class GetWatchTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
@Test
|
||||
public void testGet() throws Exception {
|
||||
ensureWatcherStarted();
|
||||
PutWatchResponse putResponse = watcherClient().preparePutWatch("_name").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("5m")))
|
||||
.input(simpleInput())
|
||||
@ -46,14 +45,14 @@ public class GetWatchTests extends AbstractWatcherIntegrationTests {
|
||||
assertThat(getResponse, notNullValue());
|
||||
assertThat(getResponse.isFound(), is(true));
|
||||
assertThat(getResponse.getId(), is("_name"));
|
||||
assertThat(getResponse.getVersion(), is(putResponse.getVersion()));
|
||||
assertThat(getResponse.getStatus().version(), is(putResponse.getVersion()));
|
||||
Map<String, Object> source = getResponse.getSource().getAsMap();
|
||||
assertThat(source, notNullValue());
|
||||
assertThat(source, hasKey("trigger"));
|
||||
assertThat(source, hasKey("input"));
|
||||
assertThat(source, hasKey("condition"));
|
||||
assertThat(source, hasKey("actions"));
|
||||
assertThat(source, hasKey("status"));
|
||||
assertThat(source, not(hasKey("status")));
|
||||
}
|
||||
|
||||
@Test(expected = ActionRequestValidationException.class)
|
||||
@ -63,13 +62,11 @@ public class GetWatchTests extends AbstractWatcherIntegrationTests {
|
||||
|
||||
@Test
|
||||
public void testGet_NotFound() throws Exception {
|
||||
ensureWatcherStarted();
|
||||
|
||||
GetWatchResponse getResponse = watcherClient().getWatch(new GetWatchRequest("_name")).get();
|
||||
assertThat(getResponse, notNullValue());
|
||||
assertThat(getResponse.getId(), is("_name"));
|
||||
assertThat(getResponse.getVersion(), is(-1L));
|
||||
assertThat(getResponse.isFound(), is(false));
|
||||
assertThat(getResponse.getStatus(), nullValue());
|
||||
assertThat(getResponse.getSource(), nullValue());
|
||||
XContentSource source = getResponse.getSource();
|
||||
assertThat(source, nullValue());
|
||||
|
Loading…
x
Reference in New Issue
Block a user