From bfe9d290a6ea915d0e94358929b937953571a511 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Tue, 30 Jun 2020 21:13:22 +0200 Subject: [PATCH] DATAES-876 - Add seqno and primary term to entity on initial save. Original PR: #490 --- ...elasticsearch-migration-guide-4.0-4.1.adoc | 8 ++- .../core/AbstractElasticsearchTemplate.java | 38 +++++++++----- .../core/DocumentOperations.java | 16 +++--- .../core/ElasticsearchRestTemplate.java | 18 ++++--- .../core/ElasticsearchTemplate.java | 13 +++-- .../core/IndexedObjectInformation.java | 52 +++++++++++++++++++ .../core/ElasticsearchTemplateTests.java | 23 ++++++++ 7 files changed, 135 insertions(+), 33 deletions(-) create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java diff --git a/src/main/asciidoc/reference/elasticsearch-migration-guide-4.0-4.1.adoc b/src/main/asciidoc/reference/elasticsearch-migration-guide-4.0-4.1.adoc index b5f98598a..8765ffcce 100644 --- a/src/main/asciidoc/reference/elasticsearch-migration-guide-4.0-4.1.adoc +++ b/src/main/asciidoc/reference/elasticsearch-migration-guide-4.0-4.1.adoc @@ -16,7 +16,8 @@ In the `ReactiveElasticsearchClient.Indices` interface the `updateMapping` metho They do the same, but `putMapping` is consistent with the naming in the Elasticsearch API: .Alias handling -In the `IndexOperations` interface the methods `addAlias(AliasQuery)`, `removeAlias(AliasQuery)` and `queryForAlias()` have been deprecated. The new methods `alias(AliasAction)`, `getAliases(String...)` and `getAliasesForIndex(String...)` offer more functionality and a cleaner API. +In the `IndexOperations` interface the methods `addAlias(AliasQuery)`, `removeAlias(AliasQuery)` and `queryForAlias()` have been deprecated. +The new methods `alias(AliasAction)`, `getAliases(String...)` and `getAliasesForIndex(String...)` offer more functionality and a cleaner API. .Parent-ID Usage of a parent-id has been removed from Elasticsearch since version 6. We now deprecate the corresponding fields and methods. @@ -38,3 +39,8 @@ With the introduction of the `ReactiveIndexOperations` it became necessary to ch * the `createIndex` variants now return a `Mono` instead of a `Mono` to signal successful index creation. * the `updateMapping` variants now return a `Mono` instead of a `Mono` to signal successful mappings storage. + +=== Return types of DocumentOperartions.bulkIndex methods + +These methods were returing a `List` containing the ids of the new indexed records. +Now they return a `List`; these objects contain the id and information about optimistic locking (seq_no and primary_term) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index e20b231bc..0d7a1530f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.MultiSearchRequest; @@ -59,6 +60,7 @@ import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.support.VersionInfo; +import org.springframework.data.mapping.PersistentPropertyAccessor; import org.springframework.data.mapping.callback.EntityCallbacks; import org.springframework.data.util.CloseableIterator; import org.springframework.data.util.Streamable; @@ -175,11 +177,9 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper .collect(Collectors.toList()); if (!indexQueries.isEmpty()) { - List ids = bulkIndex(indexQueries, index); - Iterator idIterator = ids.iterator(); - entities.forEach(entity -> { - setPersistentEntityId(entity, idIterator.next()); - }); + List indexedObjectInformations = bulkIndex(indexQueries, index); + Iterator iterator = indexedObjectInformations.iterator(); + entities.forEach(entity -> updateIndexedObject(entity, iterator.next())); } return indexQueries.stream().map(IndexQuery::getObject).map(entity -> (T) entity).collect(Collectors.toList()); @@ -250,12 +250,12 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper } @Override - public List bulkIndex(List queries, Class clazz) { + public List bulkIndex(List queries, Class clazz) { return bulkIndex(queries, getIndexCoordinatesFor(clazz)); } @Override - public List bulkIndex(List queries, BulkOptions bulkOptions, Class clazz) { + public List bulkIndex(List queries, BulkOptions bulkOptions, Class clazz) { return bulkIndex(queries, bulkOptions, getIndexCoordinatesFor(clazz)); } @@ -473,7 +473,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper * @param bulkResponse * @return the list of the item id's */ - protected List checkForBulkOperationFailure(BulkResponse bulkResponse) { + protected List checkForBulkOperationFailure(BulkResponse bulkResponse) { if (bulkResponse.hasFailures()) { Map failedDocuments = new HashMap<>(); @@ -488,17 +488,31 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper failedDocuments); } - return Stream.of(bulkResponse.getItems()).map(BulkItemResponse::getId).collect(Collectors.toList()); + return Stream.of(bulkResponse.getItems()).map(bulkItemResponse -> { + DocWriteResponse response = bulkItemResponse.getResponse(); + if (response != null) { + return IndexedObjectInformation.of(response.getId(), response.getSeqNo(), response.getPrimaryTerm()); + } else { + return IndexedObjectInformation.of(bulkItemResponse.getId(), null, null); + } + + }).collect(Collectors.toList()); } - protected void setPersistentEntityId(Object entity, String id) { - + protected void updateIndexedObject(Object entity, IndexedObjectInformation indexedObjectInformation) { ElasticsearchPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); + PersistentPropertyAccessor propertyAccessor = persistentEntity.getPropertyAccessor(entity); ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty(); // Only deal with text because ES generated Ids are strings! if (idProperty != null && idProperty.getType().isAssignableFrom(String.class)) { - persistentEntity.getPropertyAccessor(entity).setProperty(idProperty, id); + propertyAccessor.setProperty(idProperty, indexedObjectInformation.getId()); + } + + if (persistentEntity.hasSeqNoPrimaryTermProperty()) { + ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty(); + propertyAccessor.setProperty(seqNoPrimaryTermProperty, + new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo(), indexedObjectInformation.getPrimaryTerm())); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index 005aff73c..47941263d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -157,10 +157,10 @@ public interface DocumentOperations { * * @param queries the queries to execute in bulk * @param clazz the entity class - * @return the ids of the indexed objects + * @return the information about the indexed objects * @since 4.1 */ - default List bulkIndex(List queries, Class clazz) { + default List bulkIndex(List queries, Class clazz) { return bulkIndex(queries, BulkOptions.defaultOptions(), clazz); } @@ -168,9 +168,9 @@ public interface DocumentOperations { * Bulk index all objects. Will do save or update. * * @param queries the queries to execute in bulk - * @return the ids of the indexed objects + * @return the information about of the indexed objects */ - default List bulkIndex(List queries, IndexCoordinates index) { + default List bulkIndex(List queries, IndexCoordinates index) { return bulkIndex(queries, BulkOptions.defaultOptions(), index); } @@ -180,19 +180,19 @@ public interface DocumentOperations { * @param queries the queries to execute in bulk * @param bulkOptions options to be added to the bulk request * @param clazz the entity class - * @return the ids of the indexed objects + * @return the information about of the indexed objects * @since 4.1 */ - List bulkIndex(List queries, BulkOptions bulkOptions, Class clazz); + List bulkIndex(List queries, BulkOptions bulkOptions, Class clazz); /** * Bulk index all objects. Will do save or update. * * @param queries the queries to execute in bulk * @param bulkOptions options to be added to the bulk request - * @return the ids of the indexed objects + * @return the information about of the indexed objects */ - List bulkIndex(List queries, BulkOptions bulkOptions, IndexCoordinates index); + List bulkIndex(List queries, BulkOptions bulkOptions, IndexCoordinates index); /** * Bulk update all objects. Will do update. diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 094d03633..0ac9be2cb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; @@ -142,17 +143,18 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { maybeCallbackBeforeConvertWithQuery(query, index); IndexRequest request = requestFactory.indexRequest(query, index); - String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId()); + IndexResponse indexResponse = execute(client -> client.index(request, RequestOptions.DEFAULT)); // We should call this because we are not going through a mapper. Object queryObject = query.getObject(); if (queryObject != null) { - setPersistentEntityId(queryObject, documentId); + updateIndexedObject(queryObject, + IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm())); } maybeCallbackAfterSaveWithQuery(query, index); - return documentId; + return indexResponse.getId(); } @Override @@ -186,7 +188,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { } @Override - public List bulkIndex(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + public List bulkIndex(List queries, BulkOptions bulkOptions, + IndexCoordinates index) { Assert.notNull(queries, "List of IndexQuery must not be null"); Assert.notNull(bulkOptions, "BulkOptions must not be null"); @@ -234,13 +237,14 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { return new UpdateResponse(result); } - private List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + private List doBulkOperation(List queries, BulkOptions bulkOptions, + IndexCoordinates index) { maybeCallbackBeforeConvertWithQueries(queries, index); BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index); - List ids = checkForBulkOperationFailure( + List indexedObjectInformations = checkForBulkOperationFailure( execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT))); maybeCallbackAfterSaveWithQueries(queries, index); - return ids; + return indexedObjectInformations; } // endregion diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 07b794813..ce71d7480 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -162,7 +162,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { // We should call this because we are not going through a mapper. Object queryObject = query.getObject(); if (queryObject != null) { - setPersistentEntityId(queryObject, documentId); + updateIndexedObject(queryObject, + IndexedObjectInformation.of(documentId, response.getSeqNo(), response.getPrimaryTerm())); } maybeCallbackAfterSaveWithQuery(query, index); @@ -201,16 +202,17 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { } @Override - public List bulkIndex(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + public List bulkIndex(List queries, BulkOptions bulkOptions, + IndexCoordinates index) { Assert.notNull(queries, "List of IndexQuery must not be null"); Assert.notNull(bulkOptions, "BulkOptions must not be null"); - List ids = doBulkOperation(queries, bulkOptions, index); + List indexedObjectInformations = doBulkOperation(queries, bulkOptions, index); maybeCallbackAfterSaveWithQueries(queries, index); - return ids; + return indexedObjectInformations; } @Override @@ -257,7 +259,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { return new UpdateResponse(result); } - private List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + private List doBulkOperation(List queries, BulkOptions bulkOptions, + IndexCoordinates index) { maybeCallbackBeforeConvertWithQueries(queries, index); BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index); return checkForBulkOperationFailure(bulkRequest.execute().actionGet()); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java b/src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java new file mode 100644 index 000000000..436f58e23 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import org.springframework.lang.Nullable; + +/** + * Value class capturing information about a newly indexed document in Elasticsearch. + * + * @author Peter-Josef Meisch + * @since 4.1 + */ +public class IndexedObjectInformation { + private final String id; + @Nullable private final Long seqNo; + @Nullable private final Long primaryTerm; + + private IndexedObjectInformation(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) { + this.id = id; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + public static IndexedObjectInformation of(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) { + return new IndexedObjectInformation(id, seqNo, primaryTerm); + } + + public String getId() { + return id; + } + + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index 2d24c4c66..40d9537b1 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -3156,6 +3156,29 @@ public abstract class ElasticsearchTemplateTests { assertThat(operations.exists("42", index)).isTrue(); } + @Test // DATAES-876 + void shouldReturnSeqNoPrimaryTermOnSave() { + OptimisticEntity original = new OptimisticEntity(); + original.setMessage("It's fine"); + OptimisticEntity saved = operations.save(original); + + assertThatSeqNoPrimaryTermIsFilled(saved); + } + + @Test // DATAES-876 + void shouldReturnSeqNoPrimaryTermOnBulkSave() { + OptimisticEntity original1 = new OptimisticEntity(); + original1.setMessage("It's fine 1"); + OptimisticEntity original2 = new OptimisticEntity(); + original2.setMessage("It's fine 2"); + + Iterable saved = operations.save(Arrays.asList(original1, original2)); + + saved.forEach(optimisticEntity -> { + assertThatSeqNoPrimaryTermIsFilled(optimisticEntity); + }); + } + @Test // DATAES-799 void getShouldReturnSeqNoPrimaryTerm() { OptimisticEntity original = new OptimisticEntity();