Move watcher to use seq# and primary term for concurrency control (#37977)

* move watcher to seq# occ

* top level set

* fix parsing and missing setters

* share toXContent for PutResponse and rest end point

* fix redacted password

* fix username reference

* fix deactivate-watch.asciidoc have seq no references

* add seq# + term to activate-watch.asciidoc

* more doc fixes
This commit is contained in:
Boaz Leskes 2019-01-30 20:14:59 -05:00 committed by GitHub
parent dad41c2b7f
commit b11732104f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 624 additions and 127 deletions

View File

@ -76,6 +76,7 @@ import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
@ -885,6 +886,20 @@ final class RequestConverters {
return this;
}
Params withIfSeqNo(long ifSeqNo) {
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
return putParam("if_seq_no", Long.toString(ifSeqNo));
}
return this;
}
Params withIfPrimaryTerm(long ifPrimaryTerm) {
if (ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
return putParam("if_primary_term", Long.toString(ifPrimaryTerm));
}
return this;
}
Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT);
}

View File

@ -69,7 +69,10 @@ final class WatcherRequestConverters {
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request).withVersion(putWatchRequest.getVersion());
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {
params.putParam("active", "false");
}

View File

@ -31,9 +31,14 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class GetWatchResponse {
private final String id;
private final long version;
private final long seqNo;
private final long primaryTerm;
private final WatchStatus status;
private final BytesReference source;
@ -43,15 +48,18 @@ public class GetWatchResponse {
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
this(id, Versions.NOT_FOUND, null, null, null);
this(id, Versions.NOT_FOUND, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, null, null, null);
}
public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status,
BytesReference source, XContentType xContentType) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}
public String getId() {
@ -62,6 +70,14 @@ public class GetWatchResponse {
return version;
}
public long getSeqNo() {
return seqNo;
}
public long getPrimaryTerm() {
return primaryTerm;
}
public boolean isFound() {
return version != Versions.NOT_FOUND;
}
@ -111,6 +127,8 @@ public class GetWatchResponse {
private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField SEQ_NO_FIELD = new ParseField("_seq_no");
private static final ParseField PRIMARY_TERM_FIELD = new ParseField("_primary_term");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");
@ -119,9 +137,10 @@ public class GetWatchResponse {
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
XContentBuilder builder = (XContentBuilder) a[6];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
return new GetWatchResponse((String) a[0], (long) a[2], (long) a[3], (long) a[4], (WatchStatus) a[5],
source, builder.contentType());
} else {
return new GetWatchResponse((String) a[0]);
}
@ -131,6 +150,8 @@ public class GetWatchResponse {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIMARY_TERM_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),

View File

@ -23,10 +23,14 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.util.Objects;
import java.util.regex.Pattern;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
/**
* This request class contains the data needed to create a watch along with the name of the watch.
* The name of the watch will become the ID of the indexed document.
@ -40,6 +44,9 @@ public final class PutWatchRequest implements Validatable {
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
@ -96,6 +103,56 @@ public final class PutWatchRequest implements Validatable {
this.version = version;
}
/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the watch's last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}
/**
* only performs this put request if the watch's last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the watch last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifPrimaryTerm = term;
return this;
}
/**
* If set, only perform this put watch request if the watch's last modification was assigned this sequence number.
* If the watch last last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}
/**
* If set, only perform this put watch request if the watch's last modification was assigned this primary term.
*
* If the watch's last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}
public static boolean isValidId(String id) {
return Strings.isEmpty(id) == false && NO_WS_PATTERN.matcher(id).matches();
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client.watcher;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import java.util.Objects;
@ -32,20 +33,26 @@ public class PutWatchResponse {
static {
PARSER.declareString(PutWatchResponse::setId, new ParseField("_id"));
PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no"));
PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term"));
PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version"));
PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created"));
}
private String id;
private long version;
private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
private boolean created;
public PutWatchResponse() {
}
public PutWatchResponse(String id, long version, boolean created) {
public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) {
this.id = id;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.created = created;
}
@ -57,6 +64,14 @@ public class PutWatchResponse {
this.version = version;
}
private void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}
private void setPrimaryTerm(long primaryTerm) {
this.primaryTerm = primaryTerm;
}
private void setCreated(boolean created) {
this.created = created;
}
@ -69,6 +84,14 @@ public class PutWatchResponse {
return version;
}
public long getSeqNo() {
return seqNo;
}
public long getPrimaryTerm() {
return primaryTerm;
}
public boolean isCreated() {
return created;
}
@ -80,12 +103,14 @@ public class PutWatchResponse {
PutWatchResponse that = (PutWatchResponse) o;
return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created);
return Objects.equals(id, that.id) && Objects.equals(version, that.version)
&& Objects.equals(seqNo, that.seqNo)
&& Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created);
}
@Override
public int hashCode() {
return Objects.hash(id, version, created);
return Objects.hash(id, version, seqNo, primaryTerm, created);
}
public static PutWatchResponse fromXContent(XContentParser parser) throws IOException {

View File

@ -41,14 +41,18 @@ public class PutWatchResponseTests extends ESTestCase {
return builder.startObject()
.field("_id", response.getId())
.field("_version", response.getVersion())
.field("_seq_no", response.getSeqNo())
.field("_primary_term", response.getPrimaryTerm())
.field("created", response.isCreated())
.endObject();
}
private static PutWatchResponse createTestInstance() {
String id = randomAlphaOfLength(10);
long seqNo = randomNonNegativeLong();
long primaryTerm = randomLongBetween(1, 200);
long version = randomLongBetween(1, 10);
boolean created = randomBoolean();
return new PutWatchResponse(id, version, created);
return new PutWatchResponse(id, version, seqNo, primaryTerm, created);
}
}

View File

@ -92,6 +92,8 @@ The action state of a newly-created watch is `awaits_successful_execution`:
--------------------------------------------------
{
"found": true,
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"_id": "my_watch",
"status": {
@ -136,6 +138,8 @@ and the action is now in `ackable` state:
{
"found": true,
"_id": "my_watch",
"_seq_no": 1,
"_primary_term": 1,
"_version": 2,
"status": {
"version": 2,
@ -185,6 +189,8 @@ GET _watcher/watch/my_watch
{
"found": true,
"_id": "my_watch",
"_seq_no": 2,
"_primary_term": 1,
"_version": 3,
"status": {
"version": 3,

View File

@ -44,6 +44,8 @@ GET _watcher/watch/my_watch
{
"found": true,
"_id": "my_watch",
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"status": {
"state" : {

View File

@ -44,6 +44,8 @@ GET _watcher/watch/my_watch
"found": true,
"_id": "my_watch",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"status": {
"state" : {
"active" : true,

View File

@ -44,6 +44,8 @@ Response:
{
"found": true,
"_id": "my_watch",
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"status": { <1>
"version": 1,

View File

@ -5,19 +5,24 @@
*/
package org.elasticsearch.protocol.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import java.util.regex.Pattern;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
/**
* This request class contains the data needed to create a watch along with the name of the watch.
* The name of the watch will become the ID of the indexed document.
@ -32,6 +37,9 @@ public final class PutWatchRequest extends ActionRequest {
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
public PutWatchRequest() {}
public PutWatchRequest(StreamInput in) throws IOException {
@ -52,6 +60,13 @@ public final class PutWatchRequest extends ActionRequest {
active = in.readBoolean();
xContentType = in.readEnum(XContentType.class);
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
}
@Override
@ -62,6 +77,10 @@ public final class PutWatchRequest extends ActionRequest {
out.writeBoolean(active);
out.writeEnum(xContentType);
out.writeZLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
}
}
/**
@ -122,20 +141,80 @@ public final class PutWatchRequest extends ActionRequest {
this.version = version;
}
/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the watch's last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}
/**
* only performs this put request if the watch's last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the watch last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifPrimaryTerm = term;
return this;
}
/**
* If set, only perform this put watch request if the watch's last modification was assigned this sequence number.
* If the watch last last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long getIfSeqNo() {
return ifSeqNo;
}
/**
* If set, only perform this put watch request if the watch's last modification was assigned this primary term.
*
* If the watch's last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (id == null) {
validationException = ValidateActions.addValidationError("watch id is missing", validationException);
validationException = addValidationError("watch id is missing", validationException);
} else if (isValidId(id) == false) {
validationException = ValidateActions.addValidationError("watch id contains whitespace", validationException);
validationException = addValidationError("watch id contains whitespace", validationException);
}
if (source == null) {
validationException = ValidateActions.addValidationError("watch source is missing", validationException);
validationException = addValidationError("watch source is missing", validationException);
}
if (xContentType == null) {
validationException = ValidateActions.addValidationError("request body is missing", validationException);
validationException = addValidationError("request body is missing", validationException);
}
if (ifSeqNo != UNASSIGNED_SEQ_NO && version != Versions.MATCH_ANY) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}
return validationException;
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.protocol.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
@ -13,6 +14,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import java.util.Objects;
@ -24,19 +26,25 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
static {
PARSER.declareString(PutWatchResponse::setId, new ParseField("_id"));
PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version"));
PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no"));
PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term"));
PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created"));
}
private String id;
private long version;
private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
private boolean created;
public PutWatchResponse() {
}
public PutWatchResponse(String id, long version, boolean created) {
public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) {
this.id = id;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.created = created;
}
@ -48,6 +56,14 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
this.version = version;
}
private void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}
private void setPrimaryTerm(long primaryTerm) {
this.primaryTerm = primaryTerm;
}
private void setCreated(boolean created) {
this.created = created;
}
@ -60,6 +76,14 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
return version;
}
public long getSeqNo() {
return seqNo;
}
public long getPrimaryTerm() {
return primaryTerm;
}
public boolean isCreated() {
return created;
}
@ -71,12 +95,14 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
PutWatchResponse that = (PutWatchResponse) o;
return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created);
return Objects.equals(id, that.id) && Objects.equals(version, that.version)
&& Objects.equals(seqNo, that.seqNo)
&& Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created);
}
@Override
public int hashCode() {
return Objects.hash(id, version, created);
return Objects.hash(id, version, seqNo, primaryTerm, created);
}
@Override
@ -84,6 +110,10 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
super.writeTo(out);
out.writeString(id);
out.writeVLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
}
out.writeBoolean(created);
}
@ -92,6 +122,13 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
super.readFrom(in);
id = in.readString();
version = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}
created = in.readBoolean();
}
@ -100,6 +137,8 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
return builder.startObject()
.field("_id", id)
.field("_version", version)
.field("_seq_no", seqNo)
.field("_primary_term", primaryTerm)
.field("created", created)
.endObject();
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.watcher.transport.actions.get;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -12,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
@ -25,6 +27,8 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
private boolean found;
private XContentSource source;
private long version;
private long seqNo;
private long primaryTerm;
public GetWatchResponse() {
}
@ -38,17 +42,21 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
this.found = false;
this.source = null;
this.version = Versions.NOT_FOUND;
this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
this.primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}
/**
* ctor for found watch
*/
public GetWatchResponse(String id, long version, WatchStatus status, XContentSource source) {
public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status, XContentSource source) {
this.id = id;
this.status = status;
this.found = true;
this.source = source;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}
public String getId() {
@ -71,6 +79,14 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
return version;
}
public long getSeqNo() {
return seqNo;
}
public long getPrimaryTerm() {
return primaryTerm;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -80,10 +96,16 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
status = WatchStatus.read(in);
source = XContentSource.readFrom(in);
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
}
} else {
status = null;
source = null;
version = Versions.NOT_FOUND;
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}
}
@ -96,6 +118,10 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
status.writeTo(out);
XContentSource.writeTo(source, out);
out.writeZLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
}
}
}
@ -105,6 +131,8 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
builder.field("_id", id);
if (found) {
builder.field("_version", version);
builder.field("_seq_no", seqNo);
builder.field("_primary_term", primaryTerm);
builder.field("status", status, params);
builder.field("watch", source, params);
}
@ -116,7 +144,7 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetWatchResponse that = (GetWatchResponse) o;
return version == that.version &&
return version == that.version && seqNo == that.seqNo && primaryTerm == that.primaryTerm &&
Objects.equals(id, that.id) &&
Objects.equals(status, that.status) &&
Objects.equals(source, that.source);
@ -124,7 +152,7 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
@Override
public int hashCode() {
return Objects.hash(id, status, version);
return Objects.hash(id, status, version, seqNo, primaryTerm);
}
@Override

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.xpack.core.watcher.actions.ActionStatus;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition;
@ -37,11 +38,12 @@ public class Watch implements ToXContentObject {
@Nullable private final Map<String, Object> metadata;
private final WatchStatus status;
private transient long version;
private final long sourceSeqNo;
private final long sourcePrimaryTerm;
public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform,
@Nullable TimeValue throttlePeriod, List<ActionWrapper> actions, @Nullable Map<String, Object> metadata,
WatchStatus status, long version) {
WatchStatus status, long sourceSeqNo, long sourcePrimaryTerm) {
this.id = id;
this.trigger = trigger;
this.input = input;
@ -51,7 +53,8 @@ public class Watch implements ToXContentObject {
this.throttlePeriod = throttlePeriod;
this.metadata = metadata;
this.status = status;
this.version = version;
this.sourceSeqNo = sourceSeqNo;
this.sourcePrimaryTerm = sourcePrimaryTerm;
}
public String id() {
@ -88,12 +91,20 @@ public class Watch implements ToXContentObject {
return status;
}
public long version() {
return version;
/**
* The sequence number of the document that was used to create this watch, {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* if the watch wasn't read from a document
***/
public long getSourceSeqNo() {
return sourceSeqNo;
}
public void version(long version) {
this.version = version;
/**
* The primary term of the document that was used to create this watch, {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM}
* if the watch wasn't read from a document
***/
public long getSourcePrimaryTerm() {
return sourcePrimaryTerm;
}
/**

View File

@ -17,7 +17,6 @@ public final class WatchField {
public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period");
public static final ParseField METADATA = new ParseField("metadata");
public static final ParseField STATUS = new ParseField("status");
public static final ParseField VERSION = new ParseField("_version");
public static final String ALL_ACTIONS_ID = "_all";
private WatchField() {}

View File

@ -74,6 +74,7 @@ public class GetWatchResponseTests extends
throw new AssertionError(e);
}
newInstance = new GetWatchResponse(newInstance.getId(), newInstance.getVersion(),
newInstance.getSeqNo(), newInstance.getPrimaryTerm(),
newInstance.getStatus(), new XContentSource(newSource, expectedInstance.getSource().getContentType()));
}
super.assertEqualInstances(expectedInstance, newInstance);
@ -91,9 +92,11 @@ public class GetWatchResponseTests extends
return new GetWatchResponse(id);
}
long version = randomLongBetween(0, 10);
long seqNo = randomNonNegativeLong();
long primaryTerm = randomLongBetween(1, 2000);
WatchStatus status = randomWatchStatus();
BytesReference source = simpleWatch();
return new GetWatchResponse(id, version, status, new XContentSource(source, XContentType.JSON));
return new GetWatchResponse(id, version, seqNo, primaryTerm, status, new XContentSource(source, XContentType.JSON));
}
private static BytesReference simpleWatch() {
@ -170,8 +173,8 @@ public class GetWatchResponseTests extends
@Override
public GetWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.GetWatchResponse instance) {
if (instance.isFound()) {
return new GetWatchResponse(instance.getId(), instance.getVersion(), convertHlrcToInternal(instance.getStatus()),
new XContentSource(instance.getSource(), instance.getContentType()));
return new GetWatchResponse(instance.getId(), instance.getVersion(), instance.getSeqNo(), instance.getPrimaryTerm(),
convertHlrcToInternal(instance.getStatus()), new XContentSource(instance.getSource(), instance.getContentType()));
} else {
return new GetWatchResponse(instance.getId());
}

View File

@ -16,9 +16,11 @@ public class PutWatchResponseTests extends
@Override
protected PutWatchResponse createTestInstance() {
String id = randomAlphaOfLength(10);
long seqNo = randomNonNegativeLong();
long primaryTerm = randomLongBetween(1, 20);
long version = randomLongBetween(1, 10);
boolean created = randomBoolean();
return new PutWatchResponse(id, version, created);
return new PutWatchResponse(id, version, seqNo, primaryTerm, created);
}
@Override
@ -33,7 +35,8 @@ public class PutWatchResponseTests extends
@Override
public PutWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.PutWatchResponse instance) {
return new PutWatchResponse(instance.getId(), instance.getVersion(), instance.isCreated());
return new PutWatchResponse(instance.getId(), instance.getVersion(), instance.getSeqNo(), instance.getPrimaryTerm(),
instance.isCreated());
}
@Override

View File

@ -20,6 +20,14 @@
"version" : {
"type" : "number",
"description" : "Explicit version number for concurrency control"
},
"if_seq_no" : {
"type" : "number",
"description" : "only update the watch if the last operation that has changed the watch has the specified sequence number"
},
"if_primary_term" : {
"type" : "number",
"description" : "only update the watch if the last operation that has changed the watch has the specified primary term"
}
}
},

View File

@ -221,6 +221,175 @@ setup:
}
}
---
"Test putting a watch with a redacted password with old seq no returns an error":
- skip:
version: " - 6.99.99"
reason: seq no powered concurrency was added in 7.0.0
# version 1
- do:
xpack.watcher.put_watch:
id: "watch_with_seq_no"
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "pass"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
- set: { "_seq_no": seqNo }
- set: { "_primary_term" : primaryTerm }
# using optimistic concurrency control, this one will loose
# as if two users in the watch UI tried to update the same watch
- do:
catch: conflict
xpack.watcher.put_watch:
id: "watch_with_seq_no"
if_seq_no: 123034
if_primary_term: $primaryTerm
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "::es_redacted::"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
- do:
catch: conflict
xpack.watcher.put_watch:
id: "watch_with_seq_no"
if_seq_no: $seqNo
if_primary_term: 234242423
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "user",
"password" : "::es_redacted::"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
- do:
xpack.watcher.put_watch:
id: "watch_with_seq_no"
if_seq_no: $seqNo
if_primary_term: $primaryTerm
body: >
{
"trigger": {
"schedule" : { "cron" : "0 0 0 1 * ? 2099" }
},
"input": {
"http" : {
"request" : {
"host" : "host.domain",
"port" : 9200,
"path" : "/myservice",
"auth" : {
"basic" : {
"username" : "new_user",
"password" : "::es_redacted::"
}
}
}
}
},
"actions": {
"logging": {
"logging": {
"text": "Log me Amadeus!"
}
}
}
}
- do:
search:
rest_total_hits_as_int: true
index: .watches
body: >
{
"query": {
"term": {
"_id": {
"value": "watch_with_seq_no"
}
}
}
}
- match: { hits.total: 1 }
- match: { hits.hits.0._id: "watch_with_seq_no" }
- match: { hits.hits.0._source.input.http.request.auth.basic.username: "new_user" }
- match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" }
---
"Test putting a watch with a redacted password with current version works":

View File

@ -103,7 +103,8 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste
if (isWatchDocument(shardId.getIndexName(), operation.type())) {
DateTime now = new DateTime(clock.millis(), UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON);
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
operation.getIfSeqNo(), operation.getIfPrimaryTerm());
ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",

View File

@ -298,7 +298,7 @@ public class WatcherService {
.source(new SearchSourceBuilder()
.size(scrollSize)
.sort(SortBuilders.fieldSort("_doc"))
.version(true));
.seqNoAndPrimaryTerm(true));
response = client.search(searchRequest).actionGet(defaultSearchTimeout);
if (response.getTotalShards() != response.getSuccessfulShards()) {
@ -341,8 +341,7 @@ public class WatcherService {
}
try {
Watch watch = parser.parse(id, true, hit.getSourceRef(), XContentType.JSON);
watch.version(hit.getVersion());
Watch watch = parser.parse(id, true, hit.getSourceRef(), XContentType.JSON, hit.getSeqNo(), hit.getPrimaryTerm());
if (watch.status().state().isActive()) {
watches.add(watch);
}

View File

@ -282,7 +282,8 @@ public class ExecutionService {
if (resp.isExists() == false) {
throw new ResourceNotFoundException("watch [{}] does not exist", watchId);
}
return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON);
return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON,
resp.getSeqNo(), resp.getPrimaryTerm());
});
} catch (ResourceNotFoundException e) {
String message = "unable to find watch for record [" + ctx.id() + "]";
@ -353,7 +354,8 @@ public class ExecutionService {
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
updateRequest.doc(source);
updateRequest.version(watch.version());
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
client.update(updateRequest).actionGet(indexDefaultTimeout);
} catch (DocumentMissingException e) {

View File

@ -50,8 +50,7 @@ public class RestExecuteWatchAction extends WatcherRestHandler implements RestRe
WatchField.INPUT.getPreferredName(), WatchField.CONDITION.getPreferredName(),
WatchField.ACTIONS.getPreferredName(), WatchField.TRANSFORM.getPreferredName(),
WatchField.THROTTLE_PERIOD.getPreferredName(), WatchField.THROTTLE_PERIOD_HUMAN.getPreferredName(),
WatchField.METADATA.getPreferredName(), WatchField.STATUS.getPreferredName(),
WatchField.VERSION.getPreferredName());
WatchField.METADATA.getPreferredName(), WatchField.STATUS.getPreferredName());
public RestExecuteWatchAction(Settings settings, RestController controller) {
super(settings);

View File

@ -9,7 +9,6 @@ package org.elasticsearch.xpack.watcher.rest.action;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
@ -22,7 +21,6 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchReques
import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.xpack.watcher.rest.WatcherRestHandler;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.OK;
@ -50,17 +48,9 @@ public class RestGetWatchAction extends WatcherRestHandler {
return channel -> client.getWatch(getWatchRequest, new RestBuilderListener<GetWatchResponse>(channel) {
@Override
public RestResponse buildResponse(GetWatchResponse response, XContentBuilder builder) throws Exception {
builder.startObject()
.field("found", response.isFound())
.field("_id", response.getId());
if (response.isFound()) {
builder.field("_version", response.getVersion());
ToXContent.MapParams xContentParams = new ToXContent.MapParams(request.params());
builder.field("status", response.getStatus(), xContentParams);
builder.field("watch", response.getSource(), xContentParams);
}
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
RestStatus status = response.isFound() ? OK : NOT_FOUND;
return new BytesRestResponse(status, builder);
}

View File

@ -58,15 +58,13 @@ public class RestPutWatchAction extends WatcherRestHandler implements RestReques
PutWatchRequest putWatchRequest =
new PutWatchRequest(request.param("id"), request.content(), request.getXContentType());
putWatchRequest.setVersion(request.paramAsLong("version", Versions.MATCH_ANY));
putWatchRequest.setIfSeqNo(request.paramAsLong("if_seq_no", putWatchRequest.getIfSeqNo()));
putWatchRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", putWatchRequest.getIfPrimaryTerm()));
putWatchRequest.setActive(request.paramAsBoolean("active", putWatchRequest.isActive()));
return channel -> client.putWatch(putWatchRequest, new RestBuilderListener<PutWatchResponse>(channel) {
@Override
public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception {
builder.startObject()
.field("_id", response.getId())
.field("_version", response.getVersion())
.field("created", response.isCreated())
.endObject();
response.toXContent(builder, request);
RestStatus status = response.isCreated() ? CREATED : OK;
return new BytesRestResponse(status, builder);
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -16,6 +17,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -49,15 +51,17 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
private final Clock clock;
private final WatchParser parser;
private final Client client;
private final ClusterService clusterService;
@Inject
public TransportAckWatchAction(TransportService transportService, ActionFilters actionFilters,
Clock clock, XPackLicenseState licenseState, WatchParser parser,
Client client) {
Client client, ClusterService clusterService) {
super(AckWatchAction.NAME, transportService, actionFilters, licenseState, AckWatchRequest::new);
this.clock = clock;
this.parser = parser;
this.client = client;
this.clusterService = clusterService;
}
@Override
@ -82,8 +86,7 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
} else {
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(),
now, XContentType.JSON);
watch.version(getResponse.getVersion());
now, XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.status().version(getResponse.getVersion());
String[] actionIds = request.getActionIds();
if (actionIds == null || actionIds.length == 0) {
@ -99,7 +102,12 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_0_0)) {
updateRequest.setIfSeqNo(getResponse.getSeqNo());
updateRequest.setIfPrimaryTerm(getResponse.getPrimaryTerm());
} else {
updateRequest.version(getResponse.getVersion());
}
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = jsonBuilder();
builder.startObject()

View File

@ -79,8 +79,7 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
ActionListener.<GetResponse>wrap(getResponse -> {
if (getResponse.isExists()) {
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now,
XContentType.JSON);
watch.version(getResponse.getVersion());
XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.status().version(getResponse.getVersion());
listener.onResponse(new ActivateWatchResponse(watch.status()));
} else {

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -86,9 +87,8 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
ActionListener.<GetResponse>wrap(response -> {
if (response.isExists()) {
Watch watch =
watchParser.parse(request.getId(), true, response.getSourceAsBytesRef(), request.getXContentType());
watch.version(response.getVersion());
Watch watch = watchParser.parse(request.getId(), true, response.getSourceAsBytesRef(),
request.getXContentType(), response.getSeqNo(), response.getPrimaryTerm());
watch.status().version(response.getVersion());
executeWatch(request, listener, watch, true);
} else {
@ -99,7 +99,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
try {
assert !request.isRecordExecution();
Watch watch = watchParser.parse(ExecuteWatchRequest.INLINE_WATCH_ID, true, request.getWatchSource(),
request.getXContentType());
request.getXContentType(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
executeWatch(request, listener, watch, false);
} catch (IOException e) {
logger.error(new ParameterizedMessage("failed to parse [{}]", request.getId()), e);

View File

@ -64,15 +64,15 @@ public class TransportGetWatchAction extends WatcherTransportAction<GetWatchRequ
// so that it indicates the the status is managed by watcher itself.
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getId(), true, getResponse.getSourceAsBytesRef(), now,
XContentType.JSON);
XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.toXContent(builder, WatcherParams.builder()
.hideSecrets(true)
.includeStatus(false)
.build());
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
listener.onResponse(new GetWatchResponse(watch.id(), getResponse.getVersion(), watch.status(),
new XContentSource(BytesReference.bytes(builder), XContentType.JSON)));
listener.onResponse(new GetWatchResponse(watch.id(), getResponse.getVersion(),
watch.getSourceSeqNo(), watch.getSourcePrimaryTerm(),
watch.status(), new XContentSource(BytesReference.bytes(builder), XContentType.JSON)));
}
} else {
listener.onResponse(new GetWatchResponse(request.getId()));

View File

@ -17,6 +17,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
@ -77,8 +78,9 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
protected void doExecute(PutWatchRequest request, ActionListener<PutWatchResponse> listener) {
try {
DateTime now = new DateTime(clock.millis(), UTC);
boolean isUpdate = request.getVersion() > 0;
Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType(), isUpdate);
boolean isUpdate = request.getVersion() > 0 || request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO;
Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType(),
isUpdate, request.getIfSeqNo(), request.getIfPrimaryTerm());
watch.setState(request.isActive(), now);
// ensure we only filter for the allowed headers
@ -92,14 +94,20 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
if (isUpdate) {
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
if (request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
updateRequest.setIfSeqNo(request.getIfSeqNo());
updateRequest.setIfPrimaryTerm(request.getIfPrimaryTerm());
} else {
updateRequest.version(request.getVersion());
}
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(builder);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(response -> {
boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(),
response.getSeqNo(), response.getPrimaryTerm(), created));
}, listener::onFailure),
client::update);
} else {
@ -109,7 +117,8 @@ public class TransportPutWatchAction extends WatcherTransportAction<PutWatchRequ
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest,
ActionListener.<IndexResponse>wrap(response -> {
boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(),
response.getSeqNo(), response.getPrimaryTerm(), created));
}, listener::onFailure),
client::index);
}

View File

@ -10,7 +10,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -74,13 +73,15 @@ public class WatchParser {
this.defaultActions = Collections.emptyList();
}
public Watch parse(String name, boolean includeStatus, BytesReference source, XContentType xContentType) throws IOException {
return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), xContentType, false);
public Watch parse(String name, boolean includeStatus, BytesReference source, XContentType xContentType,
long sourceSeqNo, long sourcePrimaryTerm) throws IOException {
return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), xContentType, false,
sourceSeqNo, sourcePrimaryTerm);
}
public Watch parse(String name, boolean includeStatus, BytesReference source, DateTime now,
XContentType xContentType) throws IOException {
return parse(name, includeStatus, false, source, now, xContentType, false);
XContentType xContentType, long sourceSeqNo, long sourcePrimaryTerm) throws IOException {
return parse(name, includeStatus, false, source, now, xContentType, false, sourceSeqNo, sourcePrimaryTerm);
}
/**
@ -95,17 +96,20 @@ public class WatchParser {
*
*/
public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now,
XContentType xContentType, boolean allowRedactedPasswords) throws IOException {
return parse(id, includeStatus, true, source, now, xContentType, allowRedactedPasswords);
XContentType xContentType, boolean allowRedactedPasswords, long sourceSeqNo, long sourcePrimaryTerm
) throws IOException {
return parse(id, includeStatus, true, source, now, xContentType, allowRedactedPasswords, sourceSeqNo, sourcePrimaryTerm);
}
public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now,
XContentType xContentType) throws IOException {
return parse(id, includeStatus, true, source, now, xContentType, false);
XContentType xContentType, long sourceSeqNo, long sourcePrimaryTerm) throws IOException {
return parse(id, includeStatus, true, source, now, xContentType, false, sourceSeqNo, sourcePrimaryTerm);
}
private Watch parse(String id, boolean includeStatus, boolean withSecrets, BytesReference source, DateTime now,
XContentType xContentType, boolean allowRedactedPasswords) throws IOException {
XContentType xContentType, boolean allowRedactedPasswords, long sourceSeqNo, long sourcePrimaryTerm)
throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("parsing watch [{}] ", source.utf8ToString());
}
@ -115,13 +119,14 @@ public class WatchParser {
LoggingDeprecationHandler.INSTANCE, stream),
now, withSecrets ? cryptoService : null, allowRedactedPasswords)) {
parser.nextToken();
return parse(id, includeStatus, parser);
return parse(id, includeStatus, parser, sourceSeqNo, sourcePrimaryTerm);
} catch (IOException ioe) {
throw ioException("could not parse watch [{}]", ioe, id);
}
}
public Watch parse(String id, boolean includeStatus, WatcherXContentParser parser) throws IOException {
public Watch parse(String id, boolean includeStatus, WatcherXContentParser parser, long sourceSeqNo, long sourcePrimaryTerm)
throws IOException {
Trigger trigger = null;
ExecutableInput input = defaultInput;
ExecutableCondition condition = defaultCondition;
@ -130,7 +135,6 @@ public class WatchParser {
TimeValue throttlePeriod = null;
Map<String, Object> metatdata = null;
WatchStatus status = null;
long version = Versions.MATCH_ANY;
String currentFieldName = null;
XContentParser.Token token;
@ -163,8 +167,6 @@ public class WatchParser {
actions = actionRegistry.parseActions(id, parser);
} else if (WatchField.METADATA.match(currentFieldName, parser.getDeprecationHandler())) {
metatdata = parser.map();
} else if (WatchField.VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
version = parser.longValue();
} else if (WatchField.STATUS.match(currentFieldName, parser.getDeprecationHandler())) {
if (includeStatus) {
status = WatchStatus.parse(id, parser);
@ -198,6 +200,7 @@ public class WatchParser {
}
return new Watch(id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status, version);
return new Watch(
id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status, sourceSeqNo, sourcePrimaryTerm);
}
}

View File

@ -63,6 +63,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.Is.is;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -134,13 +135,13 @@ public class WatcherIndexingListenerTests extends ESTestCase {
boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch("_id", watchActive, isNewWatch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
Engine.Index returnedOperation = listener.preIndex(shardId, operation);
assertThat(returnedOperation, is(operation));
DateTime now = new DateTime(clock.millis(), UTC);
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject());
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());
if (isNewWatch) {
if (watchActive) {
@ -162,7 +163,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(operation.type()).thenReturn(Watch.DOC_TYPE);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
for (int idx = 0; idx < totalShardCount; idx++) {
final Map<ShardId, ShardAllocationConfiguration> localShards = new HashMap<>();
@ -207,7 +208,7 @@ public class WatcherIndexingListenerTests extends ESTestCase {
when(operation.id()).thenReturn(id);
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject()))
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong()))
.thenThrow(new IOException("self thrown"));
ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class,

View File

@ -70,6 +70,7 @@ import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -194,7 +195,7 @@ public class WatcherServiceTests extends ESTestCase {
Watch watch = mock(Watch.class);
when(watchStatus.state()).thenReturn(state);
when(watch.status()).thenReturn(watchStatus);
when(parser.parse(eq(id), eq(true), any(), eq(XContentType.JSON))).thenReturn(watch);
when(parser.parse(eq(id), eq(true), any(), eq(XContentType.JSON), anyLong(), anyLong())).thenReturn(watch);
}
SearchHits searchHits = new SearchHits(hits, new TotalHits(count, TotalHits.Relation.EQUAL_TO), 1.0f);
SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1);

View File

@ -101,6 +101,7 @@ import static org.hamcrest.Matchers.sameInstance;
import static org.joda.time.DateTime.now;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@ -166,7 +167,7 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
Condition.Result conditionResult = InternalAlwaysCondition.RESULT_INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
@ -270,7 +271,7 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
input = mock(ExecutableInput.class);
Input.Result inputResult = mock(Input.Result.class);
@ -339,7 +340,7 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
ExecutableCondition condition = mock(ExecutableCondition.class);
Condition.Result conditionResult = mock(Condition.Result.class);
@ -404,7 +405,7 @@ public class ExecutionServiceTests extends ESTestCase {
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5));
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
Condition.Result conditionResult = InternalAlwaysCondition.RESULT_INSTANCE;
ExecutableCondition condition = mock(ExecutableCondition.class);
@ -464,7 +465,7 @@ public class ExecutionServiceTests extends ESTestCase {
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
mockGetWatchResponse(client, "_id", getResponse);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
DateTime now = new DateTime(clock.millis());
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
@ -838,7 +839,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(getResponse.isExists()).thenReturn(true);
when(getResponse.getId()).thenReturn("foo");
mockGetWatchResponse(client, "foo", getResponse);
when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any())).thenReturn(watch);
when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
// execute needs to fail as well as storing the history
doThrow(new EsRejectedExecutionException()).when(executor).execute(any());
@ -971,7 +972,7 @@ public class ExecutionServiceTests extends ESTestCase {
GetResponse getResponse = mock(GetResponse.class);
when(getResponse.isExists()).thenReturn(true);
mockGetWatchResponse(client, "_id", getResponse);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch);
when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch);
WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class);
when(record.state()).thenReturn(ExecutionState.EXECUTION_NOT_NEEDED);
@ -1017,7 +1018,7 @@ public class ExecutionServiceTests extends ESTestCase {
public void testUpdateWatchStatusDoesNotUpdateState() throws Exception {
WatchStatus status = new WatchStatus(DateTime.now(UTC), Collections.emptyMap());
Watch watch = new Watch("_id", new ManualTrigger(), new ExecutableNoneInput(), InternalAlwaysCondition.INSTANCE, null, null,
Collections.emptyList(), null, status, 1L);
Collections.emptyList(), null, status, 1L, 1L);
final AtomicBoolean assertionsTriggered = new AtomicBoolean(false);
doAnswer(invocation -> {

View File

@ -55,7 +55,6 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.joda.time.DateTime;
import javax.mail.internet.AddressException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -134,7 +133,7 @@ public final class WatcherTestUtils {
null,
new ArrayList<>(),
null,
new WatchStatus(new DateTime(0, UTC), emptyMap()), 1L);
new WatchStatus(new DateTime(0, UTC), emptyMap()), 1L, 1L);
TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(),
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)),
@ -181,7 +180,7 @@ public final class WatcherTestUtils {
new TimeValue(0),
actions,
Collections.singletonMap("foo", "bar"),
new WatchStatus(now, statuses), 1L);
new WatchStatus(now, statuses), 1L, 1L);
}
public static SearchType getRandomSupportedSearchType() {

View File

@ -57,7 +57,7 @@ public class ScheduleEngineTriggerBenchmark {
List<Watch> watches = new ArrayList<>(numWatches);
for (int i = 0; i < numWatches; i++) {
watches.add(new Watch("job_" + i, new ScheduleTrigger(interval(interval + "s")), new ExecutableNoneInput(),
InternalAlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null, 1L));
InternalAlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null, 1L, 1L));
}
ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptySet());

View File

@ -110,7 +110,8 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get();
assertThat(getWatchResponse.isFound(), is(true));
Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), XContentType.JSON);
Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(),
XContentType.JSON, getWatchResponse.getSeqNo(), getWatchResponse.getPrimaryTerm());
assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(),
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(),
@ -178,7 +179,8 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get();
assertThat(getWatchResponse.isFound(), is(true));
Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), XContentType.JSON);
Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true,
getWatchResponse.getSource().getBytes(), XContentType.JSON, getWatchResponse.getSeqNo(), getWatchResponse.getPrimaryTerm());
assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(),
is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION));
assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(),
@ -220,7 +222,8 @@ public class WatchAckTests extends AbstractWatcherIntegrationTestCase {
refresh();
GetResponse getResponse = client().get(new GetRequest(Watch.INDEX, Watch.DOC_TYPE, "_name")).actionGet();
Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef(), XContentType.JSON);
Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef(), XContentType.JSON,
getResponse.getSeqNo(), getResponse.getPrimaryTerm());
assertThat(watchResponse.getStatus().actionStatus("_id").ackStatus().state(),
equalTo(indexedWatch.status().actionStatus("_id").ackStatus().state()));

View File

@ -39,6 +39,7 @@ import java.util.Collections;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
@ -61,7 +62,7 @@ public class TransportAckWatchActionTests extends ESTestCase {
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
action = new TransportAckWatchAction(transportService, new ActionFilters(Collections.emptySet()),
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client);
Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client, createClusterService(threadPool));
}
public void testWatchNotFound() {

View File

@ -36,6 +36,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@ -57,7 +58,8 @@ public class TransportPutWatchActionTests extends ESTestCase {
TransportService transportService = mock(TransportService.class);
WatchParser parser = mock(WatchParser.class);
when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean())).thenReturn(watch);
when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(watch);
Client client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);

View File

@ -5,8 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.trigger.schedule.engine;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent;
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
@ -273,6 +273,6 @@ public class TickerScheduleEngineTests extends ESTestCase {
private Watch createWatch(String name, Schedule schedule) {
return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(),
InternalAlwaysCondition.INSTANCE, null, null,
Collections.emptyList(), null, null, Versions.MATCH_ANY);
Collections.emptyList(), null, null, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}
}

View File

@ -208,13 +208,16 @@ public class WatchTests extends ESTestCase {
TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10000));
Watch watch = new Watch("_name", trigger, input, condition, transform, throttlePeriod, actions, metadata, watchStatus, 1L);
final long sourceSeqNo = randomNonNegativeLong();
final long sourcePrimaryTerm = randomLongBetween(1, 200);
Watch watch = new Watch("_name", trigger, input, condition, transform, throttlePeriod, actions, metadata, watchStatus,
sourceSeqNo, sourcePrimaryTerm);
BytesReference bytes = BytesReference.bytes(jsonBuilder().value(watch));
logger.info("{}", bytes.utf8ToString());
WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock);
Watch parsedWatch = watchParser.parse("_name", includeStatus, bytes, XContentType.JSON);
Watch parsedWatch = watchParser.parse("_name", includeStatus, bytes, XContentType.JSON, sourceSeqNo, sourcePrimaryTerm);
if (includeStatus) {
assertThat(parsedWatch.status(), equalTo(watchStatus));
@ -227,6 +230,8 @@ public class WatchTests extends ESTestCase {
}
assertThat(parsedWatch.metadata(), equalTo(metadata));
assertThat(parsedWatch.actions(), equalTo(actions));
assertThat(parsedWatch.getSourceSeqNo(), equalTo(sourceSeqNo));
assertThat(parsedWatch.getSourcePrimaryTerm(), equalTo(sourcePrimaryTerm));
}
public void testThatBothStatusFieldsCanBeRead() throws Exception {
@ -256,7 +261,7 @@ public class WatchTests extends ESTestCase {
WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock);
XContentBuilder builder = jsonBuilder().startObject().startObject("trigger").endObject().field("status", watchStatus).endObject();
Watch watch = watchParser.parse("foo", true, BytesReference.bytes(builder), XContentType.JSON);
Watch watch = watchParser.parse("foo", true, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L);
assertThat(watch.status().state().getTimestamp().getMillis(), is(clock.millis()));
for (ActionWrapper action : actions) {
assertThat(watch.status().actionStatus(action.id()), is(actionsStatuses.get(action.id())));
@ -284,7 +289,7 @@ public class WatchTests extends ESTestCase {
.endObject();
WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock);
try {
watchParser.parse("failure", false, BytesReference.bytes(jsonBuilder), XContentType.JSON);
watchParser.parse("failure", false, BytesReference.bytes(jsonBuilder), XContentType.JSON, 1L, 1L);
fail("This watch should fail to parse as actions is an array");
} catch (ElasticsearchParseException pe) {
assertThat(pe.getMessage().contains("could not parse actions for watch [failure]"), is(true));
@ -309,7 +314,7 @@ public class WatchTests extends ESTestCase {
.endObject();
builder.endObject();
WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, Clock.systemUTC());
Watch watch = watchParser.parse("failure", false, BytesReference.bytes(builder), XContentType.JSON);
Watch watch = watchParser.parse("failure", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L);
assertThat(watch, notNullValue());
assertThat(watch.trigger(), instanceOf(ScheduleTrigger.class));
assertThat(watch.input(), instanceOf(ExecutableNoneInput.class));
@ -375,7 +380,7 @@ public class WatchTests extends ESTestCase {
builder.endObject();
// parse in default mode:
Watch watch = watchParser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON);
Watch watch = watchParser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L);
assertThat(((ScriptCondition) watch.condition()).getScript().getLang(), equalTo(Script.DEFAULT_SCRIPT_LANG));
WatcherSearchTemplateRequest request = ((SearchInput) watch.input().input()).getRequest();
SearchRequest searchRequest = searchTemplateService.toSearchRequest(request);
@ -394,7 +399,7 @@ public class WatchTests extends ESTestCase {
builder.endObject();
WatchParser parser = createWatchparser();
Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON);
Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L);
assertThat(watch, is(notNullValue()));
assertThat(watch.input().type(), is(NoneInput.TYPE));
}
@ -410,7 +415,7 @@ public class WatchTests extends ESTestCase {
builder.endObject();
WatchParser parser = createWatchparser();
Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON);
Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L);
assertThat(watch, is(notNullValue()));
assertThat(watch.actions(), hasSize(0));
}
@ -429,7 +434,7 @@ public class WatchTests extends ESTestCase {
WatchParser parser = createWatchparser();
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class,
() -> parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON));
() -> parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L));
assertThat(e.getMessage(), is("could not parse watch [_id]. missing required field [trigger]"));
}
}