Adapt minimum versions for seq# powered operations in Watch related requests and UpdateRequest (#38231)

After backporting #37977, #37857 and #37872
This commit is contained in:
Boaz Leskes 2019-02-02 02:37:16 +01:00 committed by Jason Tedor
parent 783c9ed372
commit f6e06a2b19
8 changed files with 24 additions and 54 deletions

View File

@ -159,8 +159,8 @@ task verifyVersions {
* the enabled state of every bwc task. It should be set back to true
* after the backport of the backcompat code is complete.
*/
final boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/37951" /* place a PR link here when committing bwc changes */
final boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")

View File

@ -2,8 +2,8 @@
"Update with if_seq_no":
- skip:
version: " - 6.99.99"
reason: if_seq_no was added in 7.0
version: " - 6.6.99"
reason: if_seq_no was added in 6.7.0
- do:
catch: missing

View File

@ -879,10 +879,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
docAsUpsert = in.readBoolean();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
detectNoop = in.readBoolean();
scriptedUpsert = in.readBoolean();
}
@ -934,10 +932,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
out.writeBoolean(docAsUpsert);
out.writeLong(version);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
}
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
out.writeBoolean(detectNoop);
out.writeBoolean(scriptedUpsert);
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.protocol.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Strings;
@ -60,13 +59,8 @@ public final class PutWatchRequest extends ActionRequest {
active = in.readBoolean();
xContentType = in.readEnum(XContentType.class);
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
} else {
ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
}
@Override
@ -77,10 +71,8 @@ public final class PutWatchRequest extends ActionRequest {
out.writeBoolean(active);
out.writeEnum(xContentType);
out.writeZLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
}
out.writeZLong(ifSeqNo);
out.writeVLong(ifPrimaryTerm);
}
/**

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.protocol.xpack.watcher;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
@ -110,10 +109,8 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
super.writeTo(out);
out.writeString(id);
out.writeVLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
}
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
out.writeBoolean(created);
}
@ -122,13 +119,8 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject
super.readFrom(in);
id = in.readString();
version = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
}
seqNo = in.readZLong();
primaryTerm = in.readVLong();
created = in.readBoolean();
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.core.watcher.transport.actions.get;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -96,10 +95,8 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
status = WatchStatus.read(in);
source = XContentSource.readFrom(in);
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
}
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
status = null;
source = null;
@ -118,10 +115,8 @@ public class GetWatchResponse extends ActionResponse implements ToXContent {
status.writeTo(out);
XContentSource.writeTo(source, out);
out.writeZLong(version);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
}
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
}
}

View File

@ -224,8 +224,8 @@ setup:
---
"Test putting a watch with a redacted password with old seq no returns an error":
- skip:
version: " - 6.99.99"
reason: seq no powered concurrency was added in 7.0.0
version: " - 6.6.99"
reason: seq no powered concurrency was added in 6.7.0
# version 1
- do:

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.watcher.transport.actions.ack;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -102,12 +101,8 @@ public class TransportAckWatchAction extends WatcherTransportAction<AckWatchRequ
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_0_0)) {
updateRequest.setIfSeqNo(getResponse.getSeqNo());
updateRequest.setIfPrimaryTerm(getResponse.getPrimaryTerm());
} else {
updateRequest.version(getResponse.getVersion());
}
updateRequest.setIfSeqNo(getResponse.getSeqNo());
updateRequest.setIfPrimaryTerm(getResponse.getPrimaryTerm());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = jsonBuilder();
builder.startObject()