[7.x] Support `if_seq_no` and `if_primary_term` for ingest (#55430) (#57768)

Allow for optimistic concurrency control during ingest by checking the
sequence number and primary term. This is accomplished by defining
_if_seq_no and _if_primary_term in the pipeline, similarly to _version
and _version_type.

Closes #41255
Co-authored-by: Maria Ralli <mariai.ralli@gmail.com>
This commit is contained in:
Jake Landis 2020-06-09 14:20:26 -05:00 committed by GitHub
parent 3945712c72
commit fff0a106c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 113 additions and 2 deletions

View File

@ -124,6 +124,22 @@ public class SetProcessorTests extends ESTestCase {
assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
}
public void testSetMetadataIfSeqNo() throws Exception {
long ifSeqNo = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo));
}
public void testSetMetadataIfPrimaryTerm() throws Exception {
long ifPrimaryTerm = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
}
private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) {
return new SetProcessor(randomAlphaOfLength(10), new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled);

View File

@ -0,0 +1,62 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404
---
"Test set _if_seq_no & _if_primary_term":
- do:
cluster.health:
wait_for_status: green
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "_if_seq_no",
"value": 0
}
},
{
"set" : {
"field" : "_if_primary_term",
"value": 1
}
}
]
}
- match: { acknowledged: true }
- do:
catch: conflict
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {}
- match: { error.root_cause.0.type: "version_conflict_engine_exception" }
- match: { error.root_cause.0.reason: "[1]: version conflict, required seqNo [0], primary term [1]. but no document was found" }
- do:
index:
index: test
id: 1
body: {}
- match: { _seq_no: 0 }
- match: { _primary_term: 1 }
- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {}
- match: { _seq_no: 1 }
- match: { _primary_term: 1 }

View File

@ -207,6 +207,14 @@ public class SimulatePipelineRequest extends ActionRequest implements ToXContent
}
IngestDocument ingestDocument =
new IngestDocument(index, type, id, routing, version, versionType, document);
if (dataMap.containsKey(Metadata.IF_SEQ_NO.getFieldName())) {
Long ifSeqNo = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_SEQ_NO.getFieldName());
ingestDocument.setFieldValue(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo);
}
if (dataMap.containsKey(Metadata.IF_PRIMARY_TERM.getFieldName())) {
Long ifPrimaryTerm = (Long) ConfigurationUtils.readObject(null, null, dataMap, Metadata.IF_PRIMARY_TERM.getFieldName());
ingestDocument.setFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm);
}
ingestDocumentList.add(ingestDocument);
}
return ingestDocumentList;

View File

@ -703,7 +703,9 @@ public final class IngestDocument {
ID(IdFieldMapper.NAME),
ROUTING(RoutingFieldMapper.NAME),
VERSION(VersionFieldMapper.NAME),
VERSION_TYPE("_version_type");
VERSION_TYPE("_version_type"),
IF_SEQ_NO("_if_seq_no"),
IF_PRIMARY_TERM("_if_primary_term");
private final String fieldName;

View File

@ -525,6 +525,12 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
if (metadataMap.get(IngestDocument.Metadata.VERSION_TYPE) != null) {
indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.Metadata.VERSION_TYPE)));
}
if (metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO) != null) {
indexRequest.setIfSeqNo(((Number) metadataMap.get(IngestDocument.Metadata.IF_SEQ_NO)).longValue());
}
if (metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM) != null) {
indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue());
}
indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
handler.accept(null);
}

View File

@ -47,6 +47,8 @@ import static org.elasticsearch.ingest.IngestDocument.Metadata.ROUTING;
import static org.elasticsearch.ingest.IngestDocument.Metadata.TYPE;
import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION;
import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION_TYPE;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@ -150,7 +152,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
for (int i = 0; i < numDocs; i++) {
Map<String, Object> doc = new HashMap<>();
Map<String, Object> expectedDoc = new HashMap<>();
List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE);
List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE, IF_SEQ_NO,
IF_PRIMARY_TERM);
for(IngestDocument.Metadata field : fields) {
if (field == VERSION) {
Long value = randomLong();
@ -162,6 +165,10 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == IF_SEQ_NO || field == IF_PRIMARY_TERM) {
Long value = randomNonNegativeLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == TYPE) {
if (useExplicitType) {
String value = randomAlphaOfLengthBetween(1, 10);
@ -230,6 +237,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName())));
assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName())));
assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName())));
assertThat(metadataMap.get(IF_SEQ_NO), equalTo(expectedDocument.get(IF_SEQ_NO.getFieldName())));
assertThat(metadataMap.get(IF_PRIMARY_TERM), equalTo(expectedDocument.get(IF_PRIMARY_TERM.getFieldName())));
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE)));
}

View File

@ -763,6 +763,8 @@ public class IngestServiceTests extends ESTestCase {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final long newVersion = randomLong();
final String versionType = randomFrom("internal", "external", "external_gt", "external_gte");
final long ifSeqNo = randomNonNegativeLong();
final long ifPrimaryTerm = randomNonNegativeLong();
doAnswer((InvocationOnMock invocationOnMock) -> {
IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0];
for (IngestDocument.Metadata metadata : IngestDocument.Metadata.values()) {
@ -770,6 +772,10 @@ public class IngestServiceTests extends ESTestCase {
ingestDocument.setFieldValue(metadata.getFieldName(), newVersion);
} else if (metadata == IngestDocument.Metadata.VERSION_TYPE) {
ingestDocument.setFieldValue(metadata.getFieldName(), versionType);
} else if (metadata == IngestDocument.Metadata.IF_SEQ_NO) {
ingestDocument.setFieldValue(metadata.getFieldName(), ifSeqNo);
} else if (metadata == IngestDocument.Metadata.IF_PRIMARY_TERM) {
ingestDocument.setFieldValue(metadata.getFieldName(), ifPrimaryTerm);
} else {
ingestDocument.setFieldValue(metadata.getFieldName(), "update" + metadata.getFieldName());
}
@ -796,6 +802,8 @@ public class IngestServiceTests extends ESTestCase {
assertThat(indexRequest.routing(), equalTo("update_routing"));
assertThat(indexRequest.version(), equalTo(newVersion));
assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType)));
assertThat(indexRequest.ifSeqNo(), equalTo(ifSeqNo));
assertThat(indexRequest.ifPrimaryTerm(), equalTo(ifPrimaryTerm));
}
public void testExecuteFailure() throws Exception {