From fff0a106c9caf2f47ad144af30309f356c3c1589 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 9 Jun 2020 14:20:26 -0500 Subject: [PATCH] [7.x] Support `if_seq_no` and `if_primary_term` for ingest (#55430) (#57768) Allow for optimistic concurrency control during ingest by checking the sequence number and primary term. This is accomplished by defining _if_seq_no and _if_primary_term in the pipeline, similarly to _version and _version_type. Closes #41255 Co-authored-by: Maria Ralli --- .../ingest/common/SetProcessorTests.java | 16 +++++ .../rest-api-spec/test/ingest/260_seq_no.yml | 62 +++++++++++++++++++ .../ingest/SimulatePipelineRequest.java | 8 +++ .../elasticsearch/ingest/IngestDocument.java | 4 +- .../elasticsearch/ingest/IngestService.java | 6 ++ .../SimulatePipelineRequestParsingTests.java | 11 +++- .../ingest/IngestServiceTests.java | 8 +++ 7 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/260_seq_no.yml diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java index f641ad604c3..09dfade8f5c 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java @@ -124,6 +124,22 @@ public class SetProcessorTests extends ESTestCase { assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType)); } + public void testSetMetadataIfSeqNo() throws Exception { + long ifSeqNo = randomNonNegativeLong(); + Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo)); + } + + public void testSetMetadataIfPrimaryTerm() throws Exception { + long ifPrimaryTerm = randomNonNegativeLong(); + Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm)); + } + private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) { return new SetProcessor(randomAlphaOfLength(10), new TestTemplateService.MockTemplateScript.Factory(fieldName), ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled); diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/260_seq_no.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/260_seq_no.yml new file mode 100644 index 00000000000..7c8c7d1c677 --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/260_seq_no.yml @@ -0,0 +1,62 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test set _if_seq_no & _if_primary_term": + - do: + cluster.health: + wait_for_status: green + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "_if_seq_no", + "value": 0 + } + }, + { + "set" : { + "field" : "_if_primary_term", + "value": 1 + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: conflict + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {} + - match: { error.root_cause.0.type: "version_conflict_engine_exception" } + - match: { error.root_cause.0.reason: "[1]: version conflict, required seqNo [0], primary term [1]. but no document was found" } + + - do: + index: + index: test + id: 1 + body: {} + - match: { _seq_no: 0 } + - match: { _primary_term: 1 } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {} + - match: { _seq_no: 1 } + - match: { _primary_term: 1 } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 7563ddf278d..bafdfcd7b98 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -207,6 +207,14 @@ public class SimulatePipelineRequest extends ActionRequest implements ToXContent } IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, document); + if (dataMap.containsKey(Metadata.IF_SEQ_NO.getFieldName())) { + Long ifSeqNo = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_SEQ_NO.getFieldName()); + ingestDocument.setFieldValue(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo); + } + if (dataMap.containsKey(Metadata.IF_PRIMARY_TERM.getFieldName())) { + Long ifPrimaryTerm = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_PRIMARY_TERM.getFieldName()); + ingestDocument.setFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm); + } ingestDocumentList.add(ingestDocument); } return ingestDocumentList; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 7d150621781..e924c097c46 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -703,7 +703,9 @@ public final class IngestDocument { ID(IdFieldMapper.NAME), ROUTING(RoutingFieldMapper.NAME), VERSION(VersionFieldMapper.NAME), - VERSION_TYPE("_version_type"); + VERSION_TYPE("_version_type"), + IF_SEQ_NO("_if_seq_no"), + IF_PRIMARY_TERM("_if_primary_term"); private final String fieldName; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9994f5328fc..fe2449dca58 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -525,6 +525,12 @@ public class IngestService implements ClusterStateApplier, ReportingService doc = new HashMap<>(); Map expectedDoc = new HashMap<>(); - List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE); + List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE, IF_SEQ_NO, + IF_PRIMARY_TERM); for(IngestDocument.Metadata field : fields) { if (field == VERSION) { Long value = randomLong(); @@ -162,6 +165,10 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { ); doc.put(field.getFieldName(), value); expectedDoc.put(field.getFieldName(), value); + } else if (field == IF_SEQ_NO || field == IF_PRIMARY_TERM) { + Long value = randomNonNegativeLong(); + doc.put(field.getFieldName(), value); + expectedDoc.put(field.getFieldName(), value); } else if (field == TYPE) { if (useExplicitType) { String value = randomAlphaOfLengthBetween(1, 10); @@ -230,6 +237,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName()))); assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName()))); assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName()))); + assertThat(metadataMap.get(IF_SEQ_NO), equalTo(expectedDocument.get(IF_SEQ_NO.getFieldName()))); + assertThat(metadataMap.get(IF_PRIMARY_TERM), equalTo(expectedDocument.get(IF_PRIMARY_TERM.getFieldName()))); assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE))); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index ea7a11cf79d..6d55839f201 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -763,6 +763,8 @@ public class IngestServiceTests extends ESTestCase { ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final long newVersion = randomLong(); final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); + final long ifSeqNo = randomNonNegativeLong(); + final long ifPrimaryTerm = randomNonNegativeLong(); doAnswer((InvocationOnMock invocationOnMock) -> { IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; for (IngestDocument.Metadata metadata : IngestDocument.Metadata.values()) { @@ -770,6 +772,10 @@ public class IngestServiceTests extends ESTestCase { ingestDocument.setFieldValue(metadata.getFieldName(), newVersion); } else if (metadata == IngestDocument.Metadata.VERSION_TYPE) { ingestDocument.setFieldValue(metadata.getFieldName(), versionType); + } else if (metadata == IngestDocument.Metadata.IF_SEQ_NO) { + ingestDocument.setFieldValue(metadata.getFieldName(), ifSeqNo); + } else if (metadata == IngestDocument.Metadata.IF_PRIMARY_TERM) { + ingestDocument.setFieldValue(metadata.getFieldName(), ifPrimaryTerm); } else { ingestDocument.setFieldValue(metadata.getFieldName(), "update" + metadata.getFieldName()); } @@ -796,6 +802,8 @@ public class IngestServiceTests extends ESTestCase { assertThat(indexRequest.routing(), equalTo("update_routing")); assertThat(indexRequest.version(), equalTo(newVersion)); assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType))); + assertThat(indexRequest.ifSeqNo(), equalTo(ifSeqNo)); + assertThat(indexRequest.ifPrimaryTerm(), equalTo(ifPrimaryTerm)); } public void testExecuteFailure() throws Exception {