DATAES-623 - Polishing.

This commit is contained in:
Peter-Josef Meisch 2020-01-13 20:35:03 +01:00
parent d7262e4370
commit e5ec8fdab3
4 changed files with 79 additions and 45 deletions

View File

@ -15,11 +15,6 @@
*/
package org.springframework.data.elasticsearch.core;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -27,6 +22,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.util.Assert;
/**
* The reactive operations for the
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html">Elasticsearch Document APIs</a>.
@ -230,6 +231,7 @@ public interface ReactiveDocumentOperations {
* @return a {@link Mono} emitting the {@literal id} of the removed document.
*/
Mono<String> deleteById(String id, Class<?> entityType, IndexCoordinates index);
/**
* Delete the documents matching the given {@link Query} extracting index and type from entity metadata.
*

View File

@ -15,6 +15,20 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -57,18 +71,17 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.http.HttpStatus;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.stream.Collectors;
import static org.elasticsearch.index.VersionType.EXTERNAL;
/**
* @author Christoph Strobl
@ -165,8 +178,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
public <T> Flux<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
Assert.notNull(index, "Index must not be null");
Assert.notNull(clazz, "Class must not be null");
Assert.notNull(query, "Query must not be null");
Assert.notNull(clazz, "Class must not be null");
Assert.notNull(query, "Query must not be null");
Assert.notEmpty(query.getIds(), "No Id define for Query");
MultiGetRequest request = requestFactory.multiGetRequest(query, index);
@ -186,7 +199,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(queries, "List of UpdateQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");
Assert.notNull(index, "Index must not be null");
Assert.notNull(index, "Index must not be null");
return doBulkOperation(queries, bulkOptions, index).then();
}
@ -215,9 +228,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
if (bulkResponse.hasFailures()) {
Map<String, String> failedDocuments = new HashMap<>();
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
failedDocuments.put(item.getId(), item.getFailureMessage());
}
failedDocuments.put(item.getId(), item.getFailureMessage());
}
}
ElasticsearchException exception = new ElasticsearchException(
"Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
@ -270,8 +284,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
private IndexRequest getIndexRequest(Object value, AdaptibleEntity<?> entity, IndexCoordinates index) {
Object id = entity.getId();
IndexRequest request = id != null
? new IndexRequest(index.getIndexName()).id(converter.convertId(id))
IndexRequest request = id != null ? new IndexRequest(index.getIndexName()).id(converter.convertId(id))
: new IndexRequest(index.getIndexName());
request.source(converter.mapObject(value).toJson(), Requests.INDEX_CONTENT_TYPE);
@ -279,6 +292,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
if (entity.isVersionedEntity()) {
Number version = entity.getVersion();
if (version != null) {
request.version(version.longValue());
request.versionType(EXTERNAL);
@ -294,8 +308,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
query.setId(id.toString());
}
query.setObject(value);
if (entity.isVersionedEntity()) {
Number version = entity.getVersion();
if (version != null) {
query.setVersion(version.longValue());
}

View File

@ -15,6 +15,9 @@
*/
package org.springframework.data.elasticsearch.repository.support;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.elasticsearch.index.query.QueryBuilders;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Pageable;
@ -27,8 +30,6 @@ import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Christoph Strobl
@ -70,8 +71,8 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
Assert.notNull(entityStream, "EntityStream must not be null!");
return elasticsearchOperations
.saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates());
return elasticsearchOperations.saveAll(Flux.from(entityStream).collectList(),
entityInformation.getIndexCoordinates());
}
@Override
@ -191,9 +192,7 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
throw new IllegalStateException("Entity id must not be null!");
}
return convertId(id);
})
.collectList()
.map(objects -> {
}).collectList().map(objects -> {
return new StringQuery(QueryBuilders.idsQuery() //
.addIds(objects.toArray(new String[0])) //
@ -201,8 +200,8 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
}) //
.flatMap(query -> {
return elasticsearchOperations
.deleteBy(query, entityInformation.getJavaType(), entityInformation.getIndexCoordinates());
return elasticsearchOperations.deleteBy(query, entityInformation.getJavaType(),
entityInformation.getIndexCoordinates());
}) //
.then();
}

View File

@ -15,7 +15,30 @@
*/
package org.springframework.data.elasticsearch.core;
import lombok.*;
import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.springframework.data.elasticsearch.annotations.FieldType.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.lang.Long;
import java.lang.Object;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.search.sort.FieldSortBuilder;
@ -35,24 +58,18 @@ import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.Score;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.core.query.Criteria;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.StringQuery;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateQueryBuilder;
import org.springframework.data.elasticsearch.junit.junit4.ElasticsearchVersion;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.net.ConnectException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.springframework.data.elasticsearch.annotations.FieldType.Text;
/**
* Integration tests for {@link ReactiveElasticsearchTemplate}.