DATAES-876 - Add seqno and primary term to entity on initial save.

Original PR: #490
This commit is contained in:
Peter-Josef Meisch 2020-06-30 21:13:22 +02:00 committed by GitHub
parent 3782c8e738
commit bfe9d290a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 135 additions and 33 deletions

View File

@ -16,7 +16,8 @@ In the `ReactiveElasticsearchClient.Indices` interface the `updateMapping` metho
They do the same, but `putMapping` is consistent with the naming in the Elasticsearch API:
.Alias handling
In the `IndexOperations` interface the methods `addAlias(AliasQuery)`, `removeAlias(AliasQuery)` and `queryForAlias()` have been deprecated. The new methods `alias(AliasAction)`, `getAliases(String...)` and `getAliasesForIndex(String...)` offer more functionality and a cleaner API.
In the `IndexOperations` interface the methods `addAlias(AliasQuery)`, `removeAlias(AliasQuery)` and `queryForAlias()` have been deprecated.
The new methods `alias(AliasAction)`, `getAliases(String...)` and `getAliasesForIndex(String...)` offer more functionality and a cleaner API.
.Parent-ID
Usage of a parent-id has been removed from Elasticsearch since version 6. We now deprecate the corresponding fields and methods.
@ -38,3 +39,8 @@ With the introduction of the `ReactiveIndexOperations` it became necessary to ch
* the `createIndex` variants now return a `Mono<Boolean>` instead of a `Mono<Void>` to signal successful index creation.
* the `updateMapping` variants now return a `Mono<Boolean>` instead of a `Mono<Void>` to signal successful mappings storage.
=== Return types of DocumentOperartions.bulkIndex methods
These methods were returing a `List<String>` containing the ids of the new indexed records.
Now they return a `List<IndexedObjectInformation>`; these objects contain the id and information about optimistic locking (seq_no and primary_term)

View File

@ -25,6 +25,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
@ -59,6 +60,7 @@ import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.util.CloseableIterator;
import org.springframework.data.util.Streamable;
@ -175,11 +177,9 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
.collect(Collectors.toList());
if (!indexQueries.isEmpty()) {
List<String> ids = bulkIndex(indexQueries, index);
Iterator<String> idIterator = ids.iterator();
entities.forEach(entity -> {
setPersistentEntityId(entity, idIterator.next());
});
List<IndexedObjectInformation> indexedObjectInformations = bulkIndex(indexQueries, index);
Iterator<IndexedObjectInformation> iterator = indexedObjectInformations.iterator();
entities.forEach(entity -> updateIndexedObject(entity, iterator.next()));
}
return indexQueries.stream().map(IndexQuery::getObject).map(entity -> (T) entity).collect(Collectors.toList());
@ -250,12 +250,12 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
}
@Override
public List<String> bulkIndex(List<IndexQuery> queries, Class<?> clazz) {
public List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, Class<?> clazz) {
return bulkIndex(queries, getIndexCoordinatesFor(clazz));
}
@Override
public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, Class<?> clazz) {
public List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, Class<?> clazz) {
return bulkIndex(queries, bulkOptions, getIndexCoordinatesFor(clazz));
}
@ -473,7 +473,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
* @param bulkResponse
* @return the list of the item id's
*/
protected List<String> checkForBulkOperationFailure(BulkResponse bulkResponse) {
protected List<IndexedObjectInformation> checkForBulkOperationFailure(BulkResponse bulkResponse) {
if (bulkResponse.hasFailures()) {
Map<String, String> failedDocuments = new HashMap<>();
@ -488,17 +488,31 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
failedDocuments);
}
return Stream.of(bulkResponse.getItems()).map(BulkItemResponse::getId).collect(Collectors.toList());
return Stream.of(bulkResponse.getItems()).map(bulkItemResponse -> {
DocWriteResponse response = bulkItemResponse.getResponse();
if (response != null) {
return IndexedObjectInformation.of(response.getId(), response.getSeqNo(), response.getPrimaryTerm());
} else {
return IndexedObjectInformation.of(bulkItemResponse.getId(), null, null);
}
}).collect(Collectors.toList());
}
protected void setPersistentEntityId(Object entity, String id) {
protected void updateIndexedObject(Object entity, IndexedObjectInformation indexedObjectInformation) {
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();
// Only deal with text because ES generated Ids are strings!
if (idProperty != null && idProperty.getType().isAssignableFrom(String.class)) {
persistentEntity.getPropertyAccessor(entity).setProperty(idProperty, id);
propertyAccessor.setProperty(idProperty, indexedObjectInformation.getId());
}
if (persistentEntity.hasSeqNoPrimaryTermProperty()) {
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty();
propertyAccessor.setProperty(seqNoPrimaryTermProperty,
new SeqNoPrimaryTerm(indexedObjectInformation.getSeqNo(), indexedObjectInformation.getPrimaryTerm()));
}
}

View File

@ -157,10 +157,10 @@ public interface DocumentOperations {
*
* @param queries the queries to execute in bulk
* @param clazz the entity class
* @return the ids of the indexed objects
* @return the information about the indexed objects
* @since 4.1
*/
default List<String> bulkIndex(List<IndexQuery> queries, Class<?> clazz) {
default List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, Class<?> clazz) {
return bulkIndex(queries, BulkOptions.defaultOptions(), clazz);
}
@ -168,9 +168,9 @@ public interface DocumentOperations {
* Bulk index all objects. Will do save or update.
*
* @param queries the queries to execute in bulk
* @return the ids of the indexed objects
* @return the information about of the indexed objects
*/
default List<String> bulkIndex(List<IndexQuery> queries, IndexCoordinates index) {
default List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, IndexCoordinates index) {
return bulkIndex(queries, BulkOptions.defaultOptions(), index);
}
@ -180,19 +180,19 @@ public interface DocumentOperations {
* @param queries the queries to execute in bulk
* @param bulkOptions options to be added to the bulk request
* @param clazz the entity class
* @return the ids of the indexed objects
* @return the information about of the indexed objects
* @since 4.1
*/
List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, Class<?> clazz);
List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, Class<?> clazz);
/**
* Bulk index all objects. Will do save or update.
*
* @param queries the queries to execute in bulk
* @param bulkOptions options to be added to the bulk request
* @return the ids of the indexed objects
* @return the information about of the indexed objects
*/
List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index);
List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index);
/**
* Bulk update all objects. Will do update.

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
@ -142,17 +143,18 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
maybeCallbackBeforeConvertWithQuery(query, index);
IndexRequest request = requestFactory.indexRequest(query, index);
String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId());
IndexResponse indexResponse = execute(client -> client.index(request, RequestOptions.DEFAULT));
// We should call this because we are not going through a mapper.
Object queryObject = query.getObject();
if (queryObject != null) {
setPersistentEntityId(queryObject, documentId);
updateIndexedObject(queryObject,
IndexedObjectInformation.of(indexResponse.getId(), indexResponse.getSeqNo(), indexResponse.getPrimaryTerm()));
}
maybeCallbackAfterSaveWithQuery(query, index);
return documentId;
return indexResponse.getId();
}
@Override
@ -186,7 +188,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
}
@Override
public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
public List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
Assert.notNull(queries, "List of IndexQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
@ -234,13 +237,14 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
return new UpdateResponse(result);
}
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
private List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
maybeCallbackBeforeConvertWithQueries(queries, index);
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
List<String> ids = checkForBulkOperationFailure(
List<IndexedObjectInformation> indexedObjectInformations = checkForBulkOperationFailure(
execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
maybeCallbackAfterSaveWithQueries(queries, index);
return ids;
return indexedObjectInformations;
}
// endregion

View File

@ -162,7 +162,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
// We should call this because we are not going through a mapper.
Object queryObject = query.getObject();
if (queryObject != null) {
setPersistentEntityId(queryObject, documentId);
updateIndexedObject(queryObject,
IndexedObjectInformation.of(documentId, response.getSeqNo(), response.getPrimaryTerm()));
}
maybeCallbackAfterSaveWithQuery(query, index);
@ -201,16 +202,17 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
}
@Override
public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
public List<IndexedObjectInformation> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
Assert.notNull(queries, "List of IndexQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
List<String> ids = doBulkOperation(queries, bulkOptions, index);
List<IndexedObjectInformation> indexedObjectInformations = doBulkOperation(queries, bulkOptions, index);
maybeCallbackAfterSaveWithQueries(queries, index);
return ids;
return indexedObjectInformations;
}
@Override
@ -257,7 +259,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
return new UpdateResponse(result);
}
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
private List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
maybeCallbackBeforeConvertWithQueries(queries, index);
BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);
return checkForBulkOperationFailure(bulkRequest.execute().actionGet());

View File

@ -0,0 +1,52 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import org.springframework.lang.Nullable;
/**
* Value class capturing information about a newly indexed document in Elasticsearch.
*
* @author Peter-Josef Meisch
* @since 4.1
*/
public class IndexedObjectInformation {
private final String id;
@Nullable private final Long seqNo;
@Nullable private final Long primaryTerm;
private IndexedObjectInformation(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) {
this.id = id;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}
public static IndexedObjectInformation of(String id, @Nullable Long seqNo, @Nullable Long primaryTerm) {
return new IndexedObjectInformation(id, seqNo, primaryTerm);
}
public String getId() {
return id;
}
public long getSeqNo() {
return seqNo;
}
public long getPrimaryTerm() {
return primaryTerm;
}
}

View File

@ -3156,6 +3156,29 @@ public abstract class ElasticsearchTemplateTests {
assertThat(operations.exists("42", index)).isTrue();
}
@Test // DATAES-876
void shouldReturnSeqNoPrimaryTermOnSave() {
OptimisticEntity original = new OptimisticEntity();
original.setMessage("It's fine");
OptimisticEntity saved = operations.save(original);
assertThatSeqNoPrimaryTermIsFilled(saved);
}
@Test // DATAES-876
void shouldReturnSeqNoPrimaryTermOnBulkSave() {
OptimisticEntity original1 = new OptimisticEntity();
original1.setMessage("It's fine 1");
OptimisticEntity original2 = new OptimisticEntity();
original2.setMessage("It's fine 2");
Iterable<OptimisticEntity> saved = operations.save(Arrays.asList(original1, original2));
saved.forEach(optimisticEntity -> {
assertThatSeqNoPrimaryTermIsFilled(optimisticEntity);
});
}
@Test // DATAES-799
void getShouldReturnSeqNoPrimaryTerm() {
OptimisticEntity original = new OptimisticEntity();