From c82792b34d09e26d1cbb7f46d2b848684e55aaed Mon Sep 17 00:00:00 2001 From: Roman Puchkovskiy Date: Wed, 19 Aug 2020 23:18:44 +0400 Subject: [PATCH] DATAES-908 - Fill version on an indexed entity. Original PR: #506 --- .../core/AbstractElasticsearchTemplate.java | 27 ++++++- .../core/ElasticsearchRestTemplate.java | 4 +- .../core/ElasticsearchTemplate.java | 5 +- .../core/IndexedObjectInformation.java | 22 ++++-- .../core/ReactiveElasticsearchTemplate.java | 41 +++++++++-- .../core/ElasticsearchTemplateTests.java | 70 +++++++++++++++++++ .../ReactiveElasticsearchTemplateTests.java | 41 ++++++++++- 7 files changed, 193 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index 4a891ace3..2e5b29519 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -493,9 +493,10 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper return Stream.of(bulkResponse.getItems()).map(bulkItemResponse -> { DocWriteResponse response = bulkItemResponse.getResponse(); if (response != null) { - return IndexedObjectInformation.of(response.getId(), response.getSeqNo(), response.getPrimaryTerm()); + return IndexedObjectInformation.of(response.getId(), response.getSeqNo(), response.getPrimaryTerm(), + response.getVersion()); } else { - return IndexedObjectInformation.of(bulkItemResponse.getId(), null, null); + return IndexedObjectInformation.of(bulkItemResponse.getId(), null, null, null); } }).collect(Collectors.toList()); @@ -511,11 +512,17 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper propertyAccessor.setProperty(idProperty, indexedObjectInformation.getId()); } - if (persistentEntity.hasSeqNoPrimaryTermProperty()) { + if (indexedObjectInformation.getSeqNo() != null && indexedObjectInformation.getPrimaryTerm() != null + && persistentEntity.hasSeqNoPrimaryTermProperty()) { ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty(); propertyAccessor.setProperty(seqNoPrimaryTermProperty, new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo(), indexedObjectInformation.getPrimaryTerm())); } + + if (indexedObjectInformation.getVersion() != null && persistentEntity.hasVersionProperty()) { + ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty(); + propertyAccessor.setProperty(versionProperty, indexedObjectInformation.getVersion()); + } } ElasticsearchPersistentEntity getRequiredPersistentEntity(Class clazz) { @@ -662,6 +669,20 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper // endregion + protected void updateIndexedObjectsWithQueries(List queries, + List indexedObjectInformations) { + for (int i = 0; i < queries.size(); i++) { + Object query = queries.get(i); + if (query instanceof IndexQuery) { + IndexQuery indexQuery = (IndexQuery) query; + Object queryObject = indexQuery.getObject(); + if (queryObject != null) { + updateIndexedObject(queryObject, indexedObjectInformations.get(i)); + } + } + } + } + // region Document callbacks protected interface DocumentCallback { @Nullable diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 033cc9abb..6a3b0db60 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -149,7 +149,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { Object queryObject = query.getObject(); if (queryObject != null) { updateIndexedObject(queryObject, - IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm())); + IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(), + indexResponse.getPrimaryTerm(), indexResponse.getVersion())); } maybeCallbackAfterSaveWithQuery(query, index); @@ -243,6 +244,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index); List indexedObjectInformations = checkForBulkOperationFailure( execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT))); + updateIndexedObjectsWithQueries(queries, indexedObjectInformations); maybeCallbackAfterSaveWithQueries(queries, index); return indexedObjectInformations; } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 86124a819..493496c7e 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -163,7 +163,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { Object queryObject = query.getObject(); if (queryObject != null) { updateIndexedObject(queryObject, - IndexedObjectInformation.of(documentId, response.getSeqNo(), response.getPrimaryTerm())); + IndexedObjectInformation.of(documentId, response.getSeqNo(), response.getPrimaryTerm(), + response.getVersion())); } maybeCallbackAfterSaveWithQuery(query, index); @@ -210,6 +211,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { List indexedObjectInformations = doBulkOperation(queries, bulkOptions, index); + updateIndexedObjectsWithQueries(queries, indexedObjectInformations); + maybeCallbackAfterSaveWithQueries(queries, index); return indexedObjectInformations; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java b/src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java index 436f58e23..f3340e297 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/IndexedObjectInformation.java @@ -21,32 +21,44 @@ import org.springframework.lang.Nullable; * Value class capturing information about a newly indexed document in Elasticsearch. * * @author Peter-Josef Meisch + * @author Roman Puchkovskiy * @since 4.1 */ public class IndexedObjectInformation { private final String id; @Nullable private final Long seqNo; @Nullable private final Long primaryTerm; + @Nullable private final Long version; - private IndexedObjectInformation(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) { + private IndexedObjectInformation(String id, @Nullable Long seqNo, @Nullable Long primaryTerm, + @Nullable Long version) { this.id = id; this.seqNo = seqNo; this.primaryTerm = primaryTerm; + this.version = version; } - public static IndexedObjectInformation of(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) { - return new IndexedObjectInformation(id, seqNo, primaryTerm); + public static IndexedObjectInformation of(String id, @Nullable Long seqNo, @Nullable Long primaryTerm, + @Nullable Long version) { + return new IndexedObjectInformation(id, seqNo, primaryTerm, version); } public String getId() { return id; } - public long getSeqNo() { + @Nullable + public Long getSeqNo() { return seqNo; } - public long getPrimaryTerm() { + @Nullable + public Long getPrimaryTerm() { return primaryTerm; } + + @Nullable + public Long getVersion() { + return version; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index a3c8b135f..e0a4e56a8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.core; +import org.elasticsearch.action.DocWriteResponse; +import org.springframework.data.mapping.PersistentPropertyAccessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -204,9 +206,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera .map(it -> { T savedEntity = it.getT1(); IndexResponse indexResponse = it.getT2(); - AdaptibleEntity adaptableEntity = operations.forEntity(savedEntity, converter.getConversionService()); - // noinspection ReactiveStreamsNullableInLambdaInTransform - return adaptableEntity.populateIdIfNecessary(indexResponse.getId()); + return updateIndexedObject(savedEntity, + IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(), + indexResponse.getPrimaryTerm(), indexResponse.getVersion())); }).flatMap(saved -> maybeCallAfterSave(saved, index)); } @@ -241,15 +243,42 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera T savedEntity = entities.entityAt(indexAndResponse.getT1()); BulkItemResponse bulkItemResponse = indexAndResponse.getT2(); - AdaptibleEntity adaptibleEntity = operations.forEntity(savedEntity, - converter.getConversionService()); - adaptibleEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId()); + DocWriteResponse response = bulkItemResponse.getResponse(); + updateIndexedObject(savedEntity, + IndexedObjectInformation.of(response.getId(), response.getSeqNo(), + response.getPrimaryTerm(), response.getVersion())); return maybeCallAfterSave(savedEntity, index); }); }); } + private T updateIndexedObject(T entity, IndexedObjectInformation indexedObjectInformation) { + AdaptibleEntity adaptibleEntity = operations.forEntity(entity, converter.getConversionService()); + adaptibleEntity.populateIdIfNecessary(indexedObjectInformation.getId()); + + ElasticsearchPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); + PersistentPropertyAccessor propertyAccessor = persistentEntity.getPropertyAccessor(entity); + + if (indexedObjectInformation.getSeqNo() != null && indexedObjectInformation.getPrimaryTerm() != null + && persistentEntity.hasSeqNoPrimaryTermProperty()) { + ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty(); + propertyAccessor.setProperty(seqNoPrimaryTermProperty, + new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo(), indexedObjectInformation.getPrimaryTerm())); + } + + if (indexedObjectInformation.getVersion() != null && persistentEntity.hasVersionProperty()) { + ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty(); + propertyAccessor.setProperty(versionProperty, indexedObjectInformation.getVersion()); + } + + return entity; + } + + private ElasticsearchPersistentEntity getRequiredPersistentEntity(Class clazz) { + return converter.getMappingContext().getRequiredPersistentEntity(clazz); + } + @Override public Flux multiGet(Query query, Class clazz) { return multiGet(query, clazz, getIndexCoordinatesFor(clazz)); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index 7ffda4062..c6b4d4cdb 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -3448,6 +3449,68 @@ public abstract class ElasticsearchTemplateTests { return ((AbstractElasticsearchTemplate) operations).getRequestFactory(); } + @Test // DATAES-908 + void shouldFillVersionOnSaveOne() { + VersionedEntity saved = operations.save(new VersionedEntity()); + + assertThat(saved.getVersion()).isNotNull(); + } + + @Test // DATAES-908 + void shouldFillVersionOnSaveIterable() { + List iterable = Arrays.asList(new VersionedEntity(), new VersionedEntity()); + Iterator results = operations.save(iterable).iterator(); + VersionedEntity saved1 = results.next(); + VersionedEntity saved2 = results.next(); + + assertThat(saved1.getVersion()).isNotNull(); + assertThat(saved2.getVersion()).isNotNull(); + } + + @Test // DATAES-908 + void shouldFillVersionOnSaveArray() { + VersionedEntity[] array = {new VersionedEntity(), new VersionedEntity()}; + Iterator results = operations.save(array).iterator(); + VersionedEntity saved1 = results.next(); + VersionedEntity saved2= results.next(); + + assertThat(saved1.getVersion()).isNotNull(); + assertThat(saved2.getVersion()).isNotNull(); + } + + @Test // DATAES-908 + void shouldFillVersionOnIndexOne() { + VersionedEntity entity = new VersionedEntity(); + IndexQuery query = new IndexQueryBuilder().withObject(entity).build(); + operations.index(query, operations.getIndexCoordinatesFor(VersionedEntity.class)); + + assertThat(entity.getVersion()).isNotNull(); + } + + @Test // DATAES-908 + void shouldFillVersionOnBulkIndex() { + VersionedEntity entity1 = new VersionedEntity(); + VersionedEntity entity2= new VersionedEntity(); + IndexQuery query1 = new IndexQueryBuilder().withObject(entity1).build(); + IndexQuery query2 = new IndexQueryBuilder().withObject(entity2).build(); + operations.bulkIndex(Arrays.asList(query1, query2), VersionedEntity.class); + + assertThat(entity1.getVersion()).isNotNull(); + assertThat(entity2.getVersion()).isNotNull(); + } + + @Test // DATAES-908 + void shouldFillSeqNoPrimaryKeyOnBulkIndex() { + OptimisticEntity entity1 = new OptimisticEntity(); + OptimisticEntity entity2 = new OptimisticEntity(); + IndexQuery query1 = new IndexQueryBuilder().withObject(entity1).build(); + IndexQuery query2 = new IndexQueryBuilder().withObject(entity2).build(); + operations.bulkIndex(Arrays.asList(query1, query2), OptimisticEntity.class); + + assertThatSeqNoPrimaryTermIsFilled(entity1); + assertThatSeqNoPrimaryTermIsFilled(entity2); + } + @Data @NoArgsConstructor @AllArgsConstructor @@ -3625,6 +3688,13 @@ public abstract class ElasticsearchTemplateTests { @Version private Long version; } + @Data + @Document(indexName = "test-index-versioned-entity-template") + static class VersionedEntity { + @Id private String id; + @Version private Long version; + } + @Data @NoArgsConstructor @AllArgsConstructor diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java index 4488ea42a..7e67883df 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateTests.java @@ -390,7 +390,9 @@ public class ReactiveElasticsearchTemplateTests { template.search(query, SampleEntity.class) // .map(SearchHit::getContent) // .as(StepVerifier::create) // - .expectNext(shouldMatch) // + .assertNext(next -> { + assertThat(next.getMessage()).isEqualTo("test message"); + }) // .verifyComplete(); } @@ -963,6 +965,36 @@ public class ReactiveElasticsearchTemplateTests { template.save(forEdit).as(StepVerifier::create).expectNextCount(1).verifyComplete(); } + @Test // DATAES-908 + void shouldFillVersionOnSaveOne() { + VersionedEntity saved = template.save(new VersionedEntity()).block(); + + assertThat(saved.getVersion()).isNotNull(); + } + + @Test // DATAES-908 + void shouldFillVersionOnSaveAll() { + VersionedEntity saved = template.saveAll(singletonList(new VersionedEntity()), VersionedEntity.class) + .blockLast(); + + assertThat(saved.getVersion()).isNotNull(); + } + + @Test // DATAES-908 + void shouldFillSeqNoPrimaryTermOnSaveOne() { + OptimisticEntity saved = template.save(new OptimisticEntity()).block(); + + assertThatSeqNoPrimaryTermIsFilled(saved); + } + + @Test // DATAES-908 + void shouldFillSeqNoPrimaryTermOnSaveAll() { + OptimisticEntity saved = template.saveAll(singletonList(new OptimisticEntity()), OptimisticEntity.class) + .blockLast(); + + assertThatSeqNoPrimaryTermIsFilled(saved); + } + @Data @Document(indexName = "marvel") static class Person { @@ -1051,4 +1083,11 @@ public class ReactiveElasticsearchTemplateTests { private SeqNoPrimaryTerm seqNoPrimaryTerm; @Version private Long version; } + + @Data + @Document(indexName = "test-index-reactive-versioned-entity-template") + static class VersionedEntity { + @Id private String id; + @Version private Long version; + } }