HLRC: Add get watch API (#35531)

This changes adds the support for the get watch API in the high level rest client.
This commit is contained in:
Jim Ferenczi 2018-11-30 11:02:46 +01:00 committed by GitHub
parent fa3d365ee8
commit 5c7b2c5f9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 676 additions and 15 deletions

View File

@ -26,6 +26,8 @@ import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.ActivateWatchResponse;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.AckWatchResponse;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.GetWatchResponse;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
@ -129,6 +131,34 @@ public final class WatcherClient {
PutWatchResponse::fromXContent, listener, emptySet());
}
/**
* Gets a watch from the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetWatchResponse getWatch(GetWatchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, emptySet());
}
/**
* Asynchronously gets a watch into the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void getWatchAsync(GetWatchRequest request, RequestOptions options,
ActionListener<GetWatchResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, listener, emptySet());
}
/**
* Deactivate an existing watch
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-deactivate-watch.html">

View File

@ -28,12 +28,13 @@ import org.apache.http.entity.ContentType;
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
final class WatcherRequestConverters {
@ -76,6 +77,16 @@ final class WatcherRequestConverters {
return request;
}
static Request getWatch(GetWatchRequest getWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack", "watcher", "watch")
.addPathPart(getWatchRequest.getId())
.build();
return new Request(HttpGet.METHOD_NAME, endpoint);
}
static Request deactivateWatch(DeactivateWatchRequest deactivateWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;
import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
/**
* The request to get the watch by name (id)
*/
public final class GetWatchRequest implements Validatable {
private final String id;
public GetWatchRequest(String watchId) {
validateId(watchId);
this.id = watchId;
}
private void validateId(String id) {
ValidationException exception = new ValidationException();
if (id == null) {
exception.addValidationError("watch id is missing");
} else if (PutWatchRequest.isValidId(id) == false) {
exception.addValidationError("watch id contains whitespace");
}
if (exception.validationErrors().isEmpty() == false) {
throw exception;
}
}
/**
* @return The name of the watch to retrieve
*/
public String getId() {
return id;
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class GetWatchResponse {
private final String id;
private final long version;
private final WatchStatus status;
private final BytesReference source;
private final XContentType xContentType;
/**
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
this(id, Versions.NOT_FOUND, null, null, null);
}
public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
}
public String getId() {
return id;
}
public long getVersion() {
return version;
}
public boolean isFound() {
return version != Versions.NOT_FOUND;
}
public WatchStatus getStatus() {
return status;
}
/**
* Returns the {@link XContentType} of the source
*/
public XContentType getContentType() {
return xContentType;
}
/**
* Returns the serialized watch
*/
public BytesReference getSource() {
return source;
}
/**
* Returns the source as a map
*/
public Map<String, Object> getSourceAsMap() {
return source == null ? null : XContentHelper.convertToMap(source, false, getContentType()).v2();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetWatchResponse that = (GetWatchResponse) o;
return version == that.version &&
Objects.equals(id, that.id) &&
Objects.equals(status, that.status) &&
Objects.equals(xContentType, that.xContentType) &&
Objects.equals(source, that.source);
}
@Override
public int hashCode() {
return Objects.hash(id, status, source, version);
}
private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");
private static ConstructingObjectParser<GetWatchResponse, Void> PARSER =
new ConstructingObjectParser<>("get_watch_response", true,
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
} else {
return new GetWatchResponse((String) a[0]);
}
});
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> {
try (XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent())) {
builder.copyCurrentStructure(parser);
return builder;
}
}, WATCH_FIELD);
}
public static GetWatchResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.watcher;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.joda.time.DateTime;
@ -44,19 +45,22 @@ public class WatchStatus {
private final DateTime lastMetCondition;
private final long version;
private final Map<String, ActionStatus> actions;
@Nullable private Map<String, String> headers;
public WatchStatus(long version,
State state,
ExecutionState executionState,
DateTime lastChecked,
DateTime lastMetCondition,
Map<String, ActionStatus> actions) {
Map<String, ActionStatus> actions,
Map<String, String> headers) {
this.version = version;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
this.actions = actions;
this.state = state;
this.executionState = executionState;
this.headers = headers;
}
public State state() {
@ -79,6 +83,10 @@ public class WatchStatus {
return actions.get(actionId);
}
public Map<String, ActionStatus> getActions() {
return actions;
}
public long version() {
return version;
}
@ -87,6 +95,10 @@ public class WatchStatus {
return executionState;
}
public Map<String, String> getHeaders() {
return headers;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -98,7 +110,8 @@ public class WatchStatus {
Objects.equals(lastMetCondition, that.lastMetCondition) &&
Objects.equals(version, that.version) &&
Objects.equals(executionState, that.executionState) &&
Objects.equals(actions, that.actions);
Objects.equals(actions, that.actions) &&
Objects.equals(headers, that.headers);
}
@Override
@ -112,6 +125,7 @@ public class WatchStatus {
DateTime lastChecked = null;
DateTime lastMetCondition = null;
Map<String, ActionStatus> actions = null;
Map<String, String> headers = null;
long version = -1;
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
@ -172,13 +186,17 @@ public class WatchStatus {
throw new ElasticsearchParseException("could not parse watch status. expecting field [{}] to be an object, " +
"found [{}] instead", currentFieldName, token);
}
} else if (Field.HEADERS.match(currentFieldName, parser.getDeprecationHandler())) {
if (token == XContentParser.Token.START_OBJECT) {
headers = parser.mapStrings();
}
} else {
parser.skipChildren();
}
}
actions = actions == null ? emptyMap() : unmodifiableMap(actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions, headers);
}
public static class State {
@ -214,6 +232,8 @@ public class WatchStatus {
active = parser.booleanValue();
} else if (Field.TIMESTAMP.match(currentFieldName, parser.getDeprecationHandler())) {
timestamp = parseDate(currentFieldName, parser);
} else {
parser.skipChildren();
}
}
return new State(active, timestamp);
@ -229,5 +249,6 @@ public class WatchStatus {
ParseField ACTIONS = new ParseField("actions");
ParseField VERSION = new ParseField("version");
ParseField EXECUTION_STATE = new ParseField("execution_state");
ParseField HEADERS = new ParseField("headers");
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
@ -91,6 +92,16 @@ public class WatcherRequestConvertersTests extends ESTestCase {
assertThat(bos.toString("UTF-8"), is(body));
}
public void testGetWatch() throws Exception {
String watchId = randomAlphaOfLength(10);
GetWatchRequest getWatchRequest = new GetWatchRequest(watchId);
Request request = WatcherRequestConverters.getWatch(getWatchRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/watcher/watch/" + watchId, request.getEndpoint());
assertThat(request.getEntity(), nullValue());
}
public void testDeactivateWatch() {
String watchId = randomAlphaOfLength(10);
DeactivateWatchRequest deactivateWatchRequest = new DeactivateWatchRequest(watchId);

View File

@ -34,6 +34,8 @@ import org.elasticsearch.client.watcher.ActionStatus;
import org.elasticsearch.client.watcher.ActionStatus.AckStatus;
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.DeactivateWatchResponse;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.GetWatchResponse;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatchStatus;
@ -197,6 +199,51 @@ public class WatcherDocumentationIT extends ESRestHighLevelClientTestCase {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
{
//tag::get-watch-request
GetWatchRequest request = new GetWatchRequest("my_watch_id");
//end::get-watch-request
//tag::ack-watch-execute
GetWatchResponse response = client.watcher().getWatch(request, RequestOptions.DEFAULT);
//end::get-watch-request
//tag::get-watch-response
String watchId = response.getId(); // <1>
boolean found = response.isFound(); // <2>
long version = response.getVersion(); // <3>
WatchStatus status = response.getStatus(); // <4>
BytesReference source = response.getSource(); // <5>
//end::get-watch-response
}
{
GetWatchRequest request = new GetWatchRequest("my_other_watch_id");
// tag::get-watch-execute-listener
ActionListener<GetWatchResponse> listener = new ActionListener<GetWatchResponse>() {
@Override
public void onResponse(GetWatchResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::get-watch-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::get-watch-execute-async
client.watcher().getWatchAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::get-watch-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
{
//tag::x-pack-delete-watch-execute
DeleteWatchRequest request = new DeleteWatchRequest("my_watch_id");

View File

@ -91,4 +91,10 @@ public class WatchRequestValidationTests extends ESTestCase {
() -> new PutWatchRequest("foo", BytesArray.EMPTY, null));
assertThat(exception.getMessage(), is("request body is missing"));
}
public void testGetWatchInvalidWatchId() {
ValidationException e = expectThrows(ValidationException.class,
() -> new GetWatchRequest("id with whitespaces"));
assertThat(e.validationErrors(), hasItem("watch id contains whitespace"));
}
}

View File

@ -422,6 +422,7 @@ The Java High Level REST Client supports the following Watcher APIs:
* <<{upid}-start-watch-service>>
* <<{upid}-stop-watch-service>>
* <<java-rest-high-x-pack-watcher-put-watch>>
* <<java-rest-high-x-pack-watcher-get-watch>>
* <<java-rest-high-x-pack-watcher-delete-watch>>
* <<java-rest-high-watcher-deactivate-watch>>
* <<{upid}-ack-watch>>
@ -431,6 +432,7 @@ The Java High Level REST Client supports the following Watcher APIs:
include::watcher/start-watch-service.asciidoc[]
include::watcher/stop-watch-service.asciidoc[]
include::watcher/put-watch.asciidoc[]
include::watcher/get-watch.asciidoc[]
include::watcher/delete-watch.asciidoc[]
include::watcher/ack-watch.asciidoc[]
include::watcher/deactivate-watch.asciidoc[]

View File

@ -0,0 +1,36 @@
--
:api: get-watch
:request: GetWatchRequest
:response: GetWatchResponse
--
[id="{upid}-{api}"]
=== Get Watch API
[id="{upid}-{api}-request"]
==== Execution
A watch can be retrieved as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------
[id="{upid}-{api}-response"]
==== Response
The returned +{response}+ contains `id`, `version`, `status` and `source`
information.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-response]
--------------------------------------------------
<1> `_id`, id of the watch
<2> `found` is a boolean indicating whether the watch was found
<2> `_version` returns the version of the watch
<3> `status` contains status of the watch
<4> `source` the source of the watch
include::../execution.asciidoc[]

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Encapsulates the xcontent source
@ -51,6 +52,13 @@ public class XContentSource implements ToXContent {
this(BytesReference.bytes(builder), builder.contentType());
}
/**
* @return The content type of the source
*/
public XContentType getContentType() {
return contentType;
}
/**
* @return The bytes reference of the source
*/
@ -133,4 +141,17 @@ public class XContentSource implements ToXContent {
return data;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
XContentSource that = (XContentSource) o;
return Objects.equals(bytes, that.bytes) &&
contentType == that.contentType;
}
@Override
public int hashCode() {
return Objects.hash(bytes, contentType);
}
}

View File

@ -6,21 +6,23 @@
package org.elasticsearch.xpack.core.watcher.transport.actions.get;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
import java.io.IOException;
import java.util.Objects;
public class GetWatchResponse extends ActionResponse {
public class GetWatchResponse extends ActionResponse implements ToXContent {
private String id;
private WatchStatus status;
private boolean found = false;
private boolean found;
private XContentSource source;
private long version;
@ -32,19 +34,20 @@ public class GetWatchResponse extends ActionResponse {
*/
public GetWatchResponse(String id) {
this.id = id;
this.status = null;
this.found = false;
this.source = null;
version = Versions.NOT_FOUND;
this.version = Versions.NOT_FOUND;
}
/**
* ctor for found watch
*/
public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType contentType) {
public GetWatchResponse(String id, long version, WatchStatus status, XContentSource source) {
this.id = id;
this.status = status;
this.found = true;
this.source = new XContentSource(source, contentType);
this.source = source;
this.version = version;
}
@ -77,6 +80,10 @@ public class GetWatchResponse extends ActionResponse {
status = WatchStatus.read(in);
source = XContentSource.readFrom(in);
version = in.readZLong();
} else {
status = null;
source = null;
version = Versions.NOT_FOUND;
}
}
@ -91,4 +98,37 @@ public class GetWatchResponse extends ActionResponse {
out.writeZLong(version);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("found", found);
builder.field("_id", id);
if (found) {
builder.field("_version", version);
builder.field("status", status, params);
builder.field("watch", source, params);
}
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetWatchResponse that = (GetWatchResponse) o;
return version == that.version &&
Objects.equals(id, that.id) &&
Objects.equals(status, that.status) &&
Objects.equals(source, that.source);
}
@Override
public int hashCode() {
return Objects.hash(id, status, version);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -57,8 +57,8 @@ public class WatchStatus implements ToXContentObject, Streamable {
this(-1, new State(true, now), null, null, null, actions, Collections.emptyMap());
}
private WatchStatus(long version, State state, ExecutionState executionState, DateTime lastChecked, DateTime lastMetCondition,
Map<String, ActionStatus> actions, Map<String, String> headers) {
public WatchStatus(long version, State state, ExecutionState executionState, DateTime lastChecked, DateTime lastMetCondition,
Map<String, ActionStatus> actions, Map<String, String> headers) {
this.version = version;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
@ -340,6 +340,8 @@ public class WatchStatus implements ToXContentObject, Streamable {
if (token == XContentParser.Token.START_OBJECT) {
headers = parser.mapStrings();
}
} else {
parser.skipChildren();
}
}
@ -395,6 +397,8 @@ public class WatchStatus implements ToXContentObject, Streamable {
active = parser.booleanValue();
} else if (Field.TIMESTAMP.match(currentFieldName, parser.getDeprecationHandler())) {
timestamp = parseDate(currentFieldName, parser, UTC);
} else {
parser.skipChildren();
}
}
return new State(active, timestamp);

View File

@ -0,0 +1,229 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.protocol.xpack.watcher;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.protocol.AbstractHlrcStreamableXContentTestCase;
import org.elasticsearch.xpack.core.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.core.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
public class GetWatchResponseTests extends
AbstractHlrcStreamableXContentTestCase<GetWatchResponse, org.elasticsearch.client.watcher.GetWatchResponse> {
private static final String[] SHUFFLE_FIELDS_EXCEPTION = new String[] { "watch" };
@Override
protected String[] getShuffleFieldsExceptions() {
return SHUFFLE_FIELDS_EXCEPTION;
}
@Override
protected ToXContent.Params getToXContentParams() {
return new ToXContent.MapParams(Collections.singletonMap("hide_headers", "false"));
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return f -> f.contains("watch") || f.contains("actions") || f.contains("headers");
}
@Override
protected void assertEqualInstances(GetWatchResponse expectedInstance, GetWatchResponse newInstance) {
if (expectedInstance.isFound() &&
expectedInstance.getSource().getContentType() != newInstance.getSource().getContentType()) {
/**
* The {@link GetWatchResponse#getContentType()} depends on the content type that
* was used to serialize the main object so we use the same content type than the
* <code>expectedInstance</code> to translate the watch of the <code>newInstance</code>.
*/
XContent from = XContentFactory.xContent(newInstance.getSource().getContentType());
XContent to = XContentFactory.xContent(expectedInstance.getSource().getContentType());
final BytesReference newSource;
// It is safe to use EMPTY here because this never uses namedObject
try (InputStream stream = newInstance.getSource().getBytes().streamInput();
XContentParser parser = XContentFactory.xContent(from.type()).createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
parser.nextToken();
XContentBuilder builder = XContentFactory.contentBuilder(to.type());
builder.copyCurrentStructure(parser);
newSource = BytesReference.bytes(builder);
} catch (IOException e) {
throw new AssertionError(e);
}
newInstance = new GetWatchResponse(newInstance.getId(), newInstance.getVersion(),
newInstance.getStatus(), new XContentSource(newSource, expectedInstance.getSource().getContentType()));
}
super.assertEqualInstances(expectedInstance, newInstance);
}
@Override
protected GetWatchResponse createBlankInstance() {
return new GetWatchResponse();
}
@Override
protected GetWatchResponse createTestInstance() {
String id = randomAlphaOfLength(10);
if (rarely()) {
return new GetWatchResponse(id);
}
long version = randomLongBetween(0, 10);
WatchStatus status = randomWatchStatus();
BytesReference source = simpleWatch();
return new GetWatchResponse(id, version, status, new XContentSource(source, XContentType.JSON));
}
private static BytesReference simpleWatch() {
try {
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
builder.startObject()
.startObject("trigger")
.startObject("schedule")
.field("interval", "10h")
.endObject()
.endObject()
.startObject("input")
.startObject("none").endObject()
.endObject()
.startObject("actions")
.startObject("logme")
.field("text", "{{ctx.payload}}")
.endObject()
.endObject().endObject();
return BytesReference.bytes(builder);
} catch (IOException e) {
throw new AssertionError(e);
}
}
private static WatchStatus randomWatchStatus() {
long version = randomLongBetween(-1, Long.MAX_VALUE);
WatchStatus.State state = new WatchStatus.State(randomBoolean(), DateTime.now(DateTimeZone.UTC));
ExecutionState executionState = randomFrom(ExecutionState.values());
DateTime lastChecked = rarely() ? null : DateTime.now(DateTimeZone.UTC);
DateTime lastMetCondition = rarely() ? null : DateTime.now(DateTimeZone.UTC);
int size = randomIntBetween(0, 5);
Map<String, ActionStatus> actionMap = new HashMap<>();
for (int i = 0; i < size; i++) {
ActionStatus.AckStatus ack = new ActionStatus.AckStatus(
DateTime.now(DateTimeZone.UTC),
randomFrom(ActionStatus.AckStatus.State.values())
);
ActionStatus actionStatus = new ActionStatus(
ack,
randomBoolean() ? null : randomExecution(),
randomBoolean() ? null : randomExecution(),
randomBoolean() ? null : randomThrottle()
);
actionMap.put(randomAlphaOfLength(10), actionStatus);
}
Map<String, String> headers = randomBoolean() ? new HashMap<>() : null;
if (headers != null) {
int headerSize = randomIntBetween(1, 5);
for (int i = 0; i < headerSize; i++) {
headers.put(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLengthBetween(1, 10));
}
}
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actionMap, headers);
}
private static ActionStatus.Throttle randomThrottle() {
return new ActionStatus.Throttle(DateTime.now(DateTimeZone.UTC), randomAlphaOfLengthBetween(10, 20));
}
private static ActionStatus.Execution randomExecution() {
if (randomBoolean()) {
return null;
} else if (randomBoolean()) {
return ActionStatus.Execution.failure(DateTime.now(DateTimeZone.UTC), randomAlphaOfLengthBetween(10, 20));
} else {
return ActionStatus.Execution.successful(DateTime.now(DateTimeZone.UTC));
}
}
@Override
public org.elasticsearch.client.watcher.GetWatchResponse doHlrcParseInstance(XContentParser parser) throws IOException {
return org.elasticsearch.client.watcher.GetWatchResponse.fromXContent(parser);
}
@Override
public GetWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.GetWatchResponse instance) {
if (instance.isFound()) {
return new GetWatchResponse(instance.getId(), instance.getVersion(), convertHlrcToInternal(instance.getStatus()),
new XContentSource(instance.getSource(), instance.getContentType()));
} else {
return new GetWatchResponse(instance.getId());
}
}
private static WatchStatus convertHlrcToInternal(org.elasticsearch.client.watcher.WatchStatus status) {
final Map<String, ActionStatus> actions = new HashMap<>();
for (Map.Entry<String, org.elasticsearch.client.watcher.ActionStatus> entry : status.getActions().entrySet()) {
actions.put(entry.getKey(), convertHlrcToInternal(entry.getValue()));
}
return new WatchStatus(status.version(),
convertHlrcToInternal(status.state()),
status.getExecutionState() == null ? null : convertHlrcToInternal(status.getExecutionState()),
status.lastChecked(), status.lastMetCondition(), actions, status.getHeaders()
);
}
private static ActionStatus convertHlrcToInternal(org.elasticsearch.client.watcher.ActionStatus actionStatus) {
return new ActionStatus(convertHlrcToInternal(actionStatus.ackStatus()),
actionStatus.lastExecution() == null ? null : convertHlrcToInternal(actionStatus.lastExecution()),
actionStatus.lastSuccessfulExecution() == null ? null : convertHlrcToInternal(actionStatus.lastSuccessfulExecution()),
actionStatus.lastThrottle() == null ? null : convertHlrcToInternal(actionStatus.lastThrottle())
);
}
private static ActionStatus.AckStatus convertHlrcToInternal(org.elasticsearch.client.watcher.ActionStatus.AckStatus ackStatus) {
return new ActionStatus.AckStatus(ackStatus.timestamp(), convertHlrcToInternal(ackStatus.state()));
}
private static ActionStatus.AckStatus.State convertHlrcToInternal(org.elasticsearch.client.watcher.ActionStatus.AckStatus.State state) {
return ActionStatus.AckStatus.State.valueOf(state.name());
}
private static WatchStatus.State convertHlrcToInternal(org.elasticsearch.client.watcher.WatchStatus.State state) {
return new WatchStatus.State(state.isActive(), state.getTimestamp());
}
private static ExecutionState convertHlrcToInternal(org.elasticsearch.client.watcher.ExecutionState executionState) {
return ExecutionState.valueOf(executionState.name());
}
private static ActionStatus.Execution convertHlrcToInternal(org.elasticsearch.client.watcher.ActionStatus.Execution execution) {
if (execution.successful()) {
return ActionStatus.Execution.successful(execution.timestamp());
} else {
return ActionStatus.Execution.failure(execution.timestamp(), execution.reason());
}
}
private static ActionStatus.Throttle convertHlrcToInternal(org.elasticsearch.client.watcher.ActionStatus.Throttle throttle) {
return new ActionStatus.Throttle(throttle.timestamp(), throttle.reason());
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchRequest;
import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse;
@ -71,7 +72,7 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
listener.onResponse(new GetWatchResponse(watch.id(), getResponse.getVersion(), watch.status(),
BytesReference.bytes(builder), XContentType.JSON));
new XContentSource(BytesReference.bytes(builder), XContentType.JSON)));
}
} else {
listener.onResponse(new GetWatchResponse(request.getId()));