DATAES-908 - Fill version on an indexed entity.

Original PR: #506
This commit is contained in:
Roman Puchkovskiy 2020-08-19 23:18:44 +04:00 committed by GitHub
parent 131f0318cc
commit c82792b34d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 193 additions and 17 deletions

View File

@ -493,9 +493,10 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
return Stream.of(bulkResponse.getItems()).map(bulkItemResponse -> {
DocWriteResponse response = bulkItemResponse.getResponse();
if (response != null) {
return IndexedObjectInformation.of(response.getId(), response.getSeqNo(), response.getPrimaryTerm());
return IndexedObjectInformation.of(response.getId(), response.getSeqNo(), response.getPrimaryTerm(),
response.getVersion());
} else {
return IndexedObjectInformation.of(bulkItemResponse.getId(), null, null);
return IndexedObjectInformation.of(bulkItemResponse.getId(), null, null, null);
}
}).collect(Collectors.toList());
@ -511,11 +512,17 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
propertyAccessor.setProperty(idProperty, indexedObjectInformation.getId());
}
if (persistentEntity.hasSeqNoPrimaryTermProperty()) {
if (indexedObjectInformation.getSeqNo() != null && indexedObjectInformation.getPrimaryTerm() != null
&& persistentEntity.hasSeqNoPrimaryTermProperty()) {
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty();
propertyAccessor.setProperty(seqNoPrimaryTermProperty,
new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo(), indexedObjectInformation.getPrimaryTerm()));
}
if (indexedObjectInformation.getVersion() != null && persistentEntity.hasVersionProperty()) {
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
propertyAccessor.setProperty(versionProperty, indexedObjectInformation.getVersion());
}
}
ElasticsearchPersistentEntity<?> getRequiredPersistentEntity(Class<?> clazz) {
@ -662,6 +669,20 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
// endregion
protected void updateIndexedObjectsWithQueries(List<?> queries,
List<IndexedObjectInformation> indexedObjectInformations) {
for (int i = 0; i < queries.size(); i++) {
Object query = queries.get(i);
if (query instanceof IndexQuery) {
IndexQuery indexQuery = (IndexQuery) query;
Object queryObject = indexQuery.getObject();
if (queryObject != null) {
updateIndexedObject(queryObject, indexedObjectInformations.get(i));
}
}
}
}
// region Document callbacks
protected interface DocumentCallback<T> {
@Nullable

View File

@ -149,7 +149,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
Object queryObject = query.getObject();
if (queryObject != null) {
updateIndexedObject(queryObject,
IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm()));
IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(),
indexResponse.getPrimaryTerm(), indexResponse.getVersion()));
}
maybeCallbackAfterSaveWithQuery(query, index);
@ -243,6 +244,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
List<IndexedObjectInformation> indexedObjectInformations = checkForBulkOperationFailure(
execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
updateIndexedObjectsWithQueries(queries, indexedObjectInformations);
maybeCallbackAfterSaveWithQueries(queries, index);
return indexedObjectInformations;
}

View File

@ -163,7 +163,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
Object queryObject = query.getObject();
if (queryObject != null) {
updateIndexedObject(queryObject,
IndexedObjectInformation.of(documentId, response.getSeqNo(), response.getPrimaryTerm()));
IndexedObjectInformation.of(documentId, response.getSeqNo(), response.getPrimaryTerm(),
response.getVersion()));
}
maybeCallbackAfterSaveWithQuery(query, index);
@ -210,6 +211,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
List<IndexedObjectInformation> indexedObjectInformations = doBulkOperation(queries, bulkOptions, index);
updateIndexedObjectsWithQueries(queries, indexedObjectInformations);
maybeCallbackAfterSaveWithQueries(queries, index);
return indexedObjectInformations;

View File

@ -21,32 +21,44 @@ import org.springframework.lang.Nullable;
* Value class capturing information about a newly indexed document in Elasticsearch.
*
* @author Peter-Josef Meisch
* @author Roman Puchkovskiy
* @since 4.1
*/
public class IndexedObjectInformation {
private final String id;
@Nullable private final Long seqNo;
@Nullable private final Long primaryTerm;
@Nullable private final Long version;
private IndexedObjectInformation(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) {
private IndexedObjectInformation(String id, @Nullable Long seqNo, @Nullable Long primaryTerm,
@Nullable Long version) {
this.id = id;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
}
public static IndexedObjectInformation of(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) {
return new IndexedObjectInformation(id, seqNo, primaryTerm);
public static IndexedObjectInformation of(String id, @Nullable Long seqNo, @Nullable Long primaryTerm,
@Nullable Long version) {
return new IndexedObjectInformation(id, seqNo, primaryTerm, version);
}
public String getId() {
return id;
}
public long getSeqNo() {
@Nullable
public Long getSeqNo() {
return seqNo;
}
public long getPrimaryTerm() {
@Nullable
public Long getPrimaryTerm() {
return primaryTerm;
}
@Nullable
public Long getVersion() {
return version;
}
}

View File

@ -15,6 +15,8 @@
*/
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.action.DocWriteResponse;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -204,9 +206,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
.map(it -> {
T savedEntity = it.getT1();
IndexResponse indexResponse = it.getT2();
AdaptibleEntity<T> adaptableEntity = operations.forEntity(savedEntity, converter.getConversionService());
// noinspection ReactiveStreamsNullableInLambdaInTransform
return adaptableEntity.populateIdIfNecessary(indexResponse.getId());
return updateIndexedObject(savedEntity,
IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(),
indexResponse.getPrimaryTerm(), indexResponse.getVersion()));
}).flatMap(saved -> maybeCallAfterSave(saved, index));
}
@ -241,15 +243,42 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkItemResponse bulkItemResponse = indexAndResponse.getT2();
AdaptibleEntity<T> adaptibleEntity = operations.forEntity(savedEntity,
converter.getConversionService());
adaptibleEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
DocWriteResponse response = bulkItemResponse.getResponse();
updateIndexedObject(savedEntity,
IndexedObjectInformation.of(response.getId(), response.getSeqNo(),
response.getPrimaryTerm(), response.getVersion()));
return maybeCallAfterSave(savedEntity, index);
});
});
}
private <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedObjectInformation) {
AdaptibleEntity<T> adaptibleEntity = operations.forEntity(entity, converter.getConversionService());
adaptibleEntity.populateIdIfNecessary(indexedObjectInformation.getId());
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
if (indexedObjectInformation.getSeqNo() != null && indexedObjectInformation.getPrimaryTerm() != null
&& persistentEntity.hasSeqNoPrimaryTermProperty()) {
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty();
propertyAccessor.setProperty(seqNoPrimaryTermProperty,
new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo(), indexedObjectInformation.getPrimaryTerm()));
}
if (indexedObjectInformation.getVersion() != null && persistentEntity.hasVersionProperty()) {
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
propertyAccessor.setProperty(versionProperty, indexedObjectInformation.getVersion());
}
return entity;
}
private ElasticsearchPersistentEntity<?> getRequiredPersistentEntity(Class<?> clazz) {
return converter.getMappingContext().getRequiredPersistentEntity(clazz);
}
@Override
public <T> Flux<T> multiGet(Query query, Class<T> clazz) {
return multiGet(query, clazz, getIndexCoordinatesFor(clazz));

View File

@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -3448,6 +3449,68 @@ public abstract class ElasticsearchTemplateTests {
return ((AbstractElasticsearchTemplate) operations).getRequestFactory();
}
@Test // DATAES-908
void shouldFillVersionOnSaveOne() {
VersionedEntity saved = operations.save(new VersionedEntity());
assertThat(saved.getVersion()).isNotNull();
}
@Test // DATAES-908
void shouldFillVersionOnSaveIterable() {
List<VersionedEntity> iterable = Arrays.asList(new VersionedEntity(), new VersionedEntity());
Iterator<VersionedEntity> results = operations.save(iterable).iterator();
VersionedEntity saved1 = results.next();
VersionedEntity saved2 = results.next();
assertThat(saved1.getVersion()).isNotNull();
assertThat(saved2.getVersion()).isNotNull();
}
@Test // DATAES-908
void shouldFillVersionOnSaveArray() {
VersionedEntity[] array = {new VersionedEntity(), new VersionedEntity()};
Iterator<VersionedEntity> results = operations.save(array).iterator();
VersionedEntity saved1 = results.next();
VersionedEntity saved2= results.next();
assertThat(saved1.getVersion()).isNotNull();
assertThat(saved2.getVersion()).isNotNull();
}
@Test // DATAES-908
void shouldFillVersionOnIndexOne() {
VersionedEntity entity = new VersionedEntity();
IndexQuery query = new IndexQueryBuilder().withObject(entity).build();
operations.index(query, operations.getIndexCoordinatesFor(VersionedEntity.class));
assertThat(entity.getVersion()).isNotNull();
}
@Test // DATAES-908
void shouldFillVersionOnBulkIndex() {
VersionedEntity entity1 = new VersionedEntity();
VersionedEntity entity2= new VersionedEntity();
IndexQuery query1 = new IndexQueryBuilder().withObject(entity1).build();
IndexQuery query2 = new IndexQueryBuilder().withObject(entity2).build();
operations.bulkIndex(Arrays.asList(query1, query2), VersionedEntity.class);
assertThat(entity1.getVersion()).isNotNull();
assertThat(entity2.getVersion()).isNotNull();
}
@Test // DATAES-908
void shouldFillSeqNoPrimaryKeyOnBulkIndex() {
OptimisticEntity entity1 = new OptimisticEntity();
OptimisticEntity entity2 = new OptimisticEntity();
IndexQuery query1 = new IndexQueryBuilder().withObject(entity1).build();
IndexQuery query2 = new IndexQueryBuilder().withObject(entity2).build();
operations.bulkIndex(Arrays.asList(query1, query2), OptimisticEntity.class);
assertThatSeqNoPrimaryTermIsFilled(entity1);
assertThatSeqNoPrimaryTermIsFilled(entity2);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@ -3625,6 +3688,13 @@ public abstract class ElasticsearchTemplateTests {
@Version private Long version;
}
@Data
@Document(indexName = "test-index-versioned-entity-template")
static class VersionedEntity {
@Id private String id;
@Version private Long version;
}
@Data
@NoArgsConstructor
@AllArgsConstructor

View File

@ -390,7 +390,9 @@ public class ReactiveElasticsearchTemplateTests {
template.search(query, SampleEntity.class) //
.map(SearchHit::getContent) //
.as(StepVerifier::create) //
.expectNext(shouldMatch) //
.assertNext(next -> {
assertThat(next.getMessage()).isEqualTo("test message");
}) //
.verifyComplete();
}
@ -963,6 +965,36 @@ public class ReactiveElasticsearchTemplateTests {
template.save(forEdit).as(StepVerifier::create).expectNextCount(1).verifyComplete();
}
@Test // DATAES-908
void shouldFillVersionOnSaveOne() {
VersionedEntity saved = template.save(new VersionedEntity()).block();
assertThat(saved.getVersion()).isNotNull();
}
@Test // DATAES-908
void shouldFillVersionOnSaveAll() {
VersionedEntity saved = template.saveAll(singletonList(new VersionedEntity()), VersionedEntity.class)
.blockLast();
assertThat(saved.getVersion()).isNotNull();
}
@Test // DATAES-908
void shouldFillSeqNoPrimaryTermOnSaveOne() {
OptimisticEntity saved = template.save(new OptimisticEntity()).block();
assertThatSeqNoPrimaryTermIsFilled(saved);
}
@Test // DATAES-908
void shouldFillSeqNoPrimaryTermOnSaveAll() {
OptimisticEntity saved = template.saveAll(singletonList(new OptimisticEntity()), OptimisticEntity.class)
.blockLast();
assertThatSeqNoPrimaryTermIsFilled(saved);
}
@Data
@Document(indexName = "marvel")
static class Person {
@ -1051,4 +1083,11 @@ public class ReactiveElasticsearchTemplateTests {
private SeqNoPrimaryTerm seqNoPrimaryTerm;
@Version private Long version;
}
@Data
@Document(indexName = "test-index-reactive-versioned-entity-template")
static class VersionedEntity {
@Id private String id;
@Version private Long version;
}
}