DATAES-678 - Introduce ReactiveIndexOperations.

Original PR: #481
This commit is contained in:
Peter-Josef Meisch 2020-06-13 17:08:48 +02:00 committed by GitHub
parent aeaa27cb99
commit b177dd1681
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1525 additions and 209 deletions

View File

@ -3,16 +3,31 @@
This section describes breaking changes from version 4.0.x to 4.1.x and how removed features can be replaced by new introduced features.
[[elasticsearch-migration-guide-4.0-4.1.deprecations]]
== Deprecations
.Definition of the id property
It is possible to define a property of en entity as the id property by naming it either `id` or `document`. This behaviour is now deprecated and will produce a warning. PLease us the `@Id` annotation to mark a property as being the id property.
It is possible to define a property of en entity as the id property by naming it either `id` or `document`.
This behaviour is now deprecated and will produce a warning.
PLease us the `@Id` annotation to mark a property as being the id property.
In the `ReactiveElasticsearchClient.Indices` interface the `updateMapping` methods are deprecated in favour of the `putMapping` methods.
They do the same, but `putMapping` is consistent with the naming in the Elasticsearch API:
[[elasticsearch-migration-guide-4.0-4.1.removal]]
== Removals
.Type mappings
The _type mappings_ parameters of the `@Document` annotation and the `IndexCoordinates` object were removed. They had been deprecated in Spring Data Elasticsearch 4.0 and their values weren't used anymore.
The _type mappings_ parameters of the `@Document` annotation and the `IndexCoordinates` object were removed.
They had been deprecated in Spring Data Elasticsearch 4.0 and their values weren't used anymore.
=== Breaking changes
[[elasticsearch-migration-guide-4.0-4.1.breaking-changes]]
== Breaking Changes
=== Return types of ReactiveElasticsearchClient.Indices methods
The methods in the `ReactiveElasticsearchClient.Indices` were not used up to now.
With the introduction of the `ReactiveIndexOperations` it became necessary to change some of the return types:
* the `createIndex` variants now return a `Mono<Boolean>` instead of a `Mono<Void>` to signal successful index creation.
* the `updateMapping` variants now return a `Mono<Boolean>` instead of a `Mono<Void>` to signal successful mappings storage.

View File

@ -23,6 +23,10 @@ import org.springframework.dao.UncategorizedDataAccessException;
*/
public class UncategorizedElasticsearchException extends UncategorizedDataAccessException {
public UncategorizedElasticsearchException(String msg) {
super(msg, null);
}
public UncategorizedElasticsearchException(String msg, Throwable cause) {
super(msg, cause);
}

View File

@ -55,10 +55,14 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -551,26 +555,20 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#deleteIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest)
*/
@Override
public Mono<Void> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
public Mono<Boolean> deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
return sendRequest(request, requestCreator.indexDelete(), AcknowledgedResponse.class, headers) //
.then();
.map(AcknowledgedResponse::isAcknowledged) //
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#createIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.create.CreateIndexRequest)
*/
@Override
public Mono<Void> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
public Mono<Boolean> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
return sendRequest(createIndexRequest, requestCreator.indexCreate(), AcknowledgedResponse.class, headers) //
.then();
.map(AcknowledgedResponse::isAcknowledged) //
.next();
}
/*
@ -606,15 +604,12 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.then();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#updateMapping(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest)
*/
@Override
public Mono<Void> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
public Mono<Boolean> putMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers) //
.then();
.map(AcknowledgedResponse::isAcknowledged) //
.next();
}
/*
@ -628,6 +623,16 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.then();
}
@Override
public Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest) {
return sendRequest(getMappingsRequest, requestCreator.getMapping(), GetMappingsResponse.class, headers).next();
}
@Override
public Mono<GetSettingsResponse> getSettings(HttpHeaders headers, GetSettingsRequest getSettingsRequest) {
return sendRequest(getSettingsRequest, requestCreator.getSettings(), GetSettingsResponse.class, headers).next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)

View File

@ -27,9 +27,13 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -416,11 +420,10 @@ public interface ReactiveElasticsearchClient {
/**
* Execute the given {@link SearchRequest} with aggregations against the {@literal search} API.
*
* @param consumer
* never {@literal null}.
* @param consumer never {@literal null}.
* @return the {@link Flux} emitting {@link Aggregation} one by one.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* elastic.co</a>
* @since 4.0
*/
default Flux<Aggregation> aggregate(Consumer<SearchRequest> consumer) {
@ -674,7 +677,7 @@ public interface ReactiveElasticsearchClient {
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html"> Indices
* Delete API on elastic.co</a>
*/
default Mono<Void> deleteIndex(Consumer<DeleteIndexRequest> consumer) {
default Mono<Boolean> deleteIndex(Consumer<DeleteIndexRequest> consumer) {
DeleteIndexRequest request = new DeleteIndexRequest();
consumer.accept(request);
@ -690,7 +693,7 @@ public interface ReactiveElasticsearchClient {
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html"> Indices
* Delete API on elastic.co</a>
*/
default Mono<Void> deleteIndex(DeleteIndexRequest deleteIndexRequest) {
default Mono<Boolean> deleteIndex(DeleteIndexRequest deleteIndexRequest) {
return deleteIndex(HttpHeaders.EMPTY, deleteIndexRequest);
}
@ -704,18 +707,18 @@ public interface ReactiveElasticsearchClient {
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-delete-index.html"> Indices
* Delete API on elastic.co</a>
*/
Mono<Void> deleteIndex(HttpHeaders headers, DeleteIndexRequest deleteIndexRequest);
Mono<Boolean> deleteIndex(HttpHeaders headers, DeleteIndexRequest deleteIndexRequest);
/**
* Execute the given {@link CreateIndexRequest} against the {@literal indices} API.
*
* @param consumer never {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* already exist.
* @return a {@link Mono} signalling successful operation completion or an {@link Mono#error(Throwable) error} if
* eg. the index already exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html"> Indices
* Create API on elastic.co</a>
*/
default Mono<Void> createIndex(Consumer<CreateIndexRequest> consumer) {
default Mono<Boolean> createIndex(Consumer<CreateIndexRequest> consumer) {
CreateIndexRequest request = new CreateIndexRequest();
consumer.accept(request);
@ -726,12 +729,12 @@ public interface ReactiveElasticsearchClient {
* Execute the given {@link CreateIndexRequest} against the {@literal indices} API.
*
* @param createIndexRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* already exist.
* @return a {@link Mono} signalling successful operation completion or an {@link Mono#error(Throwable) error} if
* eg. the index already exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html"> Indices
* Create API on elastic.co</a>
*/
default Mono<Void> createIndex(CreateIndexRequest createIndexRequest) {
default Mono<Boolean> createIndex(CreateIndexRequest createIndexRequest) {
return createIndex(HttpHeaders.EMPTY, createIndexRequest);
}
@ -740,12 +743,12 @@ public interface ReactiveElasticsearchClient {
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param createIndexRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* already exist.
* @return a {@link Mono} signalling successful operation completion or an {@link Mono#error(Throwable) error} if
* eg. the index already exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html"> Indices
* Create API on elastic.co</a>
*/
Mono<Void> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest);
Mono<Boolean> createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest);
/**
* Execute the given {@link OpenIndexRequest} against the {@literal indices} API.
@ -878,12 +881,58 @@ public interface ReactiveElasticsearchClient {
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html"> Indices
* Put Mapping API on elastic.co</a>
* @deprecated since 4.1, use {@link #putMapping(Consumer)}
*/
default Mono<Void> updateMapping(Consumer<PutMappingRequest> consumer) {
@Deprecated
default Mono<Boolean> updateMapping(Consumer<PutMappingRequest> consumer) {
return putMapping(consumer);
}
/**
* Execute the given {@link PutMappingRequest} against the {@literal indices} API.
*
* @param putMappingRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html"> Indices
* Put Mapping API on elastic.co</a>
* @deprecated since 4.1, use {@link #putMapping(PutMappingRequest)}
*/
@Deprecated
default Mono<Boolean> updateMapping(PutMappingRequest putMappingRequest) {
return putMapping(putMappingRequest);
}
/**
* Execute the given {@link PutMappingRequest} against the {@literal indices} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param putMappingRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html"> Indices
* Put Mapping API on elastic.co</a>
* @deprecated since 4.1, use {@link #putMapping(HttpHeaders, PutMappingRequest)}
*/
@Deprecated
default Mono<Boolean> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
return putMapping(headers, putMappingRequest);
}
/**
* Execute the given {@link PutMappingRequest} against the {@literal indices} API.
*
* @param consumer never {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html"> Indices
* Put Mapping API on elastic.co</a>
*/
default Mono<Boolean> putMapping(Consumer<PutMappingRequest> consumer) {
PutMappingRequest request = new PutMappingRequest();
consumer.accept(request);
return updateMapping(request);
return putMapping(request);
}
/**
@ -895,8 +944,8 @@ public interface ReactiveElasticsearchClient {
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html"> Indices
* Put Mapping API on elastic.co</a>
*/
default Mono<Void> updateMapping(PutMappingRequest putMappingRequest) {
return updateMapping(HttpHeaders.EMPTY, putMappingRequest);
default Mono<Boolean> putMapping(PutMappingRequest putMappingRequest) {
return putMapping(HttpHeaders.EMPTY, putMappingRequest);
}
/**
@ -909,7 +958,7 @@ public interface ReactiveElasticsearchClient {
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html"> Indices
* Put Mapping API on elastic.co</a>
*/
Mono<Void> updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest);
Mono<Boolean> putMapping(HttpHeaders headers, PutMappingRequest putMappingRequest);
/**
* Execute the given {@link FlushRequest} against the {@literal indices} API.
@ -951,5 +1000,93 @@ public interface ReactiveElasticsearchClient {
* API on elastic.co</a>
*/
Mono<Void> flushIndex(HttpHeaders headers, FlushRequest flushRequest);
/**
* Execute the given {@link GetSettingsRequest} against the {@literal indices} API.
*
* @param consumer never {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html"> Indices
* Flush API on elastic.co</a>
* @since 4.1
*/
default Mono<GetSettingsResponse> getSettings(Consumer<GetSettingsRequest> consumer) {
GetSettingsRequest request = new GetSettingsRequest();
consumer.accept(request);
return getSettings(request);
}
/**
* Execute the given {@link GetSettingsRequest} against the {@literal indices} API.
*
* @param getSettingsRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html"> Indices
* Flush API on elastic.co</a>
* @since 4.1
*/
default Mono<GetSettingsResponse> getSettings(GetSettingsRequest getSettingsRequest) {
return getSettings(HttpHeaders.EMPTY, getSettingsRequest);
}
/**
* Execute the given {@link GetSettingsRequest} against the {@literal indices} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param getSettingsRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html"> Indices
* Flush API on elastic.co</a>
* @since 4.1
*/
Mono<GetSettingsResponse> getSettings(HttpHeaders headers, GetSettingsRequest getSettingsRequest);
/**
* Execute the given {@link GetMappingsRequest} against the {@literal indices} API.
*
* @param consumer never {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-mapping.html"> Indices
* Flush API on elastic.co</a>
* @since 4.1
*/
default Mono<GetMappingsResponse> getMapping(Consumer<GetMappingsRequest> consumer) {
GetMappingsRequest request = new GetMappingsRequest();
consumer.accept(request);
return getMapping(request);
}
/**
* Execute the given {@link GetMappingsRequest} against the {@literal indices} API.
*
* @param getMappingsRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-mapping.html"> Indices
* Flush API on elastic.co</a>
* @since 4.1
*/
default Mono<GetMappingsResponse> getMapping(GetMappingsRequest getMappingsRequest) {
return getMapping(HttpHeaders.EMPTY, getMappingsRequest);
}
/**
* Execute the given {@link GetMappingsRequest} against the {@literal indices} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param getMappingsRequest must not be {@literal null}.
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
* does not exist.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-mapping.html"> Indices
* Flush API on elastic.co</a>
* @since 4.1
*/
Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest);
}
}

View File

@ -8,9 +8,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
@ -24,7 +26,7 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.springframework.data.elasticsearch.ElasticsearchException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.util.RequestConverters;
/**
@ -88,7 +90,7 @@ public interface RequestCreator {
try {
return RequestConverters.bulk(request);
} catch (IOException e) {
throw new ElasticsearchException("Could not parse request", e);
throw new UncategorizedElasticsearchException("Could not parse request", e);
}
};
}
@ -131,4 +133,18 @@ public interface RequestCreator {
return RequestConverters::count;
}
/**
* @since 4.1
*/
default Function<GetSettingsRequest, Request> getSettings() {
return RequestConverters::getSettings;
}
/**
* @since 4.1
*/
default Function<GetMappingsRequest, Request> getMapping() {
return RequestConverters::getMapping;
}
}

View File

@ -38,9 +38,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.explain.ExplainRequest;
@ -751,12 +753,12 @@ public class RequestConverters {
}
Request request = new Request(HttpMethod.PUT.name(),
RequestConverters.endpoint(putMappingRequest.indices(), "_mapping", putMappingRequest.type()));
RequestConverters.endpoint(putMappingRequest.indices(), "_mapping"));
RequestConverters.Params parameters = new RequestConverters.Params(request) //
.withTimeout(putMappingRequest.timeout()) //
.withMasterTimeout(putMappingRequest.masterNodeTimeout()) //
.withIncludeTypeName(true);
.withIncludeTypeName(false);
request.setEntity(RequestConverters.createEntity(putMappingRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE));
return request;
}
@ -772,6 +774,34 @@ public class RequestConverters {
return request;
}
public static Request getMapping(GetMappingsRequest getMappingsRequest) {
String[] indices = getMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.indices();
String[] types = getMappingsRequest.types() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.types();
Request request = new Request(HttpMethod.GET.name(), RequestConverters.endpoint(indices, "_mapping", types));
RequestConverters.Params parameters = new RequestConverters.Params(request);
parameters.withMasterTimeout(getMappingsRequest.masterNodeTimeout());
parameters.withIndicesOptions(getMappingsRequest.indicesOptions());
parameters.withLocal(getMappingsRequest.local());
parameters.withIncludeTypeName(false);
return request;
}
public static Request getSettings(GetSettingsRequest getSettingsRequest) {
String[] indices = getSettingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getSettingsRequest.indices();
String[] names = getSettingsRequest.names() == null ? Strings.EMPTY_ARRAY : getSettingsRequest.names();
Request request = new Request(HttpMethod.GET.name(), RequestConverters.endpoint(indices, "_settings", names));
RequestConverters.Params parameters = new RequestConverters.Params(request);
parameters.withIndicesOptions(getSettingsRequest.indicesOptions());
parameters.withLocal(getSettingsRequest.local());
parameters.withIncludeDefaults(getSettingsRequest.includeDefaults());
parameters.withMasterTimeout(getSettingsRequest.masterNodeTimeout());
return request;
}
static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) {
try {

View File

@ -17,13 +17,11 @@ package org.springframework.data.elasticsearch.core;
import static org.springframework.util.StringUtils.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -90,37 +88,22 @@ abstract class AbstractDefaultIndexOperations implements IndexOperations {
@Override
public boolean create() {
IndexCoordinates index;
Document settings = null;
if (boundClass != null) {
Class<?> clazz = boundClass;
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(clazz);
index = persistentEntity.getIndexCoordinates();
if (clazz.isAnnotationPresent(Setting.class)) {
String settingPath = clazz.getAnnotation(Setting.class).settingPath();
if (hasText(settingPath)) {
String fileSettings = ResourceUtil.readFileFromClasspath(settingPath);
if (hasText(fileSettings)) {
settings = Document.parse(fileSettings);
}
} else {
LOGGER.info("settingPath in @Setting has to be defined. Using default instead.");
}
settings = loadSettings(settingPath);
}
if (settings == null) {
settings = getDefaultSettings(persistentEntity);
settings = getRequiredPersistentEntity(clazz).getDefaultSettings();
}
} else {
index = boundIndex;
}
// noinspection ConstantConditions
return doCreate(index, settings);
return doCreate(getIndexCoordinates(), settings);
}
@Override
@ -236,42 +219,41 @@ abstract class AbstractDefaultIndexOperations implements IndexOperations {
// endregion
// region Helper functions
private <T> Document getDefaultSettings(ElasticsearchPersistentEntity<T> persistentEntity) {
if (persistentEntity.isUseServerConfiguration()) {
return Document.create();
}
Map<String, String> map = new MapBuilder<String, String>()
.put("index.number_of_shards", String.valueOf(persistentEntity.getShards()))
.put("index.number_of_replicas", String.valueOf(persistentEntity.getReplicas()))
.put("index.refresh_interval", persistentEntity.getRefreshInterval())
.put("index.store.type", persistentEntity.getIndexStoreType()).map();
return Document.from(map);
}
ElasticsearchPersistentEntity<?> getRequiredPersistentEntity(Class<?> clazz) {
return elasticsearchConverter.getMappingContext().getRequiredPersistentEntity(clazz);
}
/**
* get the current {@link IndexCoordinates}. These may change over time when the entity class has a SpEL constructed
* index name. When this IndexOperations is not bound to a class, the bound IndexCoordinates are returned.
*
* @return IndexCoordinates
*/
protected IndexCoordinates getIndexCoordinates() {
if (boundClass != null) {
return getIndexCoordinatesFor(boundClass);
}
Assert.notNull(boundIndex, "boundIndex may not be null");
return boundIndex;
return (boundClass != null) ? getIndexCoordinatesFor(boundClass) : boundIndex;
}
public IndexCoordinates getIndexCoordinatesFor(Class<?> clazz) {
return getRequiredPersistentEntity(clazz).getIndexCoordinates();
}
protected Map<String, Object> convertSettingsResponseToMap(GetSettingsResponse response, String indexName) {
@Nullable
private Document loadSettings(String settingPath) {
if (hasText(settingPath)) {
String settingsFile = ResourceUtil.readFileFromClasspath(settingPath);
Map<String, Object> settings = new HashMap<>();
if (hasText(settingsFile)) {
return Document.parse(settingsFile);
}
} else {
LOGGER.info("settingPath in @Setting has to be defined. Using default instead.");
}
return null;
}
protected Document convertSettingsResponseToMap(GetSettingsResponse response, String indexName) {
Document settings = Document.create();
if (!response.getIndexToDefaultSettings().isEmpty()) {
Settings defaultSettings = response.getIndexToDefaultSettings().get(indexName);

View File

@ -0,0 +1,238 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.client.Requests.*;
import static org.springframework.util.StringUtils.*;
import reactor.core.publisher.Mono;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.index.MappingBuilder;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* @author Peter-Josef Meisch
* @since 4.1
*/
class DefaultReactiveIndexOperations implements ReactiveIndexOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultReactiveIndexOperations.class);
@Nullable private final Class<?> boundClass;
private final IndexCoordinates boundIndex;
private final RequestFactory requestFactory;
private final ReactiveElasticsearchOperations operations;
private final ElasticsearchConverter converter;
public DefaultReactiveIndexOperations(ReactiveElasticsearchOperations operations, IndexCoordinates index) {
Assert.notNull(operations, "operations must not be null");
Assert.notNull(index, "index must not be null");
this.operations = operations;
this.converter = operations.getElasticsearchConverter();
this.requestFactory = new RequestFactory(operations.getElasticsearchConverter());
this.boundClass = null;
this.boundIndex = index;
}
public DefaultReactiveIndexOperations(ReactiveElasticsearchOperations operations, Class<?> clazz) {
Assert.notNull(operations, "operations must not be null");
Assert.notNull(clazz, "clazz must not be null");
this.operations = operations;
this.converter = operations.getElasticsearchConverter();
this.requestFactory = new RequestFactory(operations.getElasticsearchConverter());
this.boundClass = clazz;
this.boundIndex = getIndexCoordinatesFor(clazz);
}
// region index management
@Override
public Mono<Boolean> create() {
String indexName = getIndexCoordinates().getIndexName();
Document settings = null;
if (boundClass != null) {
Class<?> clazz = boundClass;
if (clazz.isAnnotationPresent(Setting.class)) {
String settingPath = clazz.getAnnotation(Setting.class).settingPath();
return loadDocument(settingPath, "@Setting").flatMap(document -> doCreate(indexName, document));
}
settings = getRequiredPersistentEntity(clazz).getDefaultSettings();
}
return doCreate(indexName, settings);
}
@Override
public Mono<Boolean> create(Document settings) {
return doCreate(getIndexCoordinates().getIndexName(), settings);
}
private Mono<Boolean> doCreate(String indexName, @Nullable Document settings) {
CreateIndexRequest request = requestFactory.createIndexRequestReactive(getIndexCoordinates().getIndexName(),
settings);
return Mono.from(operations.executeWithIndicesClient(client -> client.createIndex(request)));
}
@Override
public Mono<Boolean> delete() {
return exists() //
.flatMap(exists -> {
if (exists) {
DeleteIndexRequest request = requestFactory.deleteIndexRequest(getIndexCoordinates());
return Mono.from(operations.executeWithIndicesClient(client -> client.deleteIndex(request)))
.onErrorResume(NoSuchIndexException.class, e -> Mono.just(false));
} else {
return Mono.just(false);
}
});
}
@Override
public Mono<Boolean> exists() {
GetIndexRequest request = requestFactory.getIndexRequestReactive(getIndexCoordinates().getIndexName());
return Mono.from(operations.executeWithIndicesClient(client -> client.existsIndex(request)));
}
@Override
public Mono<Void> refresh() {
return Mono.from(operations.executeWithIndicesClient(
client -> client.refreshIndex(refreshRequest(getIndexCoordinates().getIndexNames()))));
}
@Override
public Mono<Document> createMapping() {
return createMapping(checkForBoundClass());
}
@Override
public Mono<Document> createMapping(Class<?> clazz) {
if (clazz.isAnnotationPresent(Mapping.class)) {
String mappingPath = clazz.getAnnotation(Mapping.class).mappingPath();
return loadDocument(mappingPath, "@Mapping");
}
String mapping = new MappingBuilder(converter).buildPropertyMapping(clazz);
return Mono.just(Document.parse(mapping));
}
@Override
public Mono<Boolean> putMapping(Mono<Document> mapping) {
return mapping.map(document -> requestFactory.putMappingRequestReactive(getIndexCoordinates(), document)) //
.flatMap(request -> Mono.from(operations.executeWithIndicesClient(client -> client.putMapping(request))));
}
@Override
public Mono<Document> getMapping() {
IndexCoordinates indexCoordinates = getIndexCoordinates();
GetMappingsRequest request = requestFactory.getMappingRequestReactive(indexCoordinates);
return Mono.from(operations.executeWithIndicesClient(client -> client.getMapping(request)))
.flatMap(getMappingsResponse -> {
Document document = Document.create();
document.put("properties",
getMappingsResponse.mappings().get(indexCoordinates.getIndexName()).get("properties").getSourceAsMap());
return Mono.just(document);
});
}
@Override
public Mono<Document> getSettings(boolean includeDefaults) {
String indexName = getIndexCoordinates().getIndexName();
GetSettingsRequest request = requestFactory.getSettingsRequest(indexName, includeDefaults);
return Mono.from(operations.executeWithIndicesClient(client -> client.getSettings(request)))
.map(getSettingsResponse -> requestFactory.fromSettingsResponse(getSettingsResponse, indexName));
}
// endregion
// region helper functions
/**
* get the current {@link IndexCoordinates}. These may change over time when the entity class has a SpEL constructed
* index name. When this IndexOperations is not bound to a class, the bound IndexCoordinates are returned.
*
* @return IndexCoordinates
*/
private IndexCoordinates getIndexCoordinates() {
return (boundClass != null) ? getIndexCoordinatesFor(boundClass) : boundIndex;
}
private IndexCoordinates getIndexCoordinatesFor(Class<?> clazz) {
return operations.getElasticsearchConverter().getMappingContext().getRequiredPersistentEntity(clazz)
.getIndexCoordinates();
}
private ElasticsearchPersistentEntity<?> getRequiredPersistentEntity(Class<?> clazz) {
return converter.getMappingContext().getRequiredPersistentEntity(clazz);
}
private Mono<Document> loadDocument(String path, String annotation) {
if (hasText(path)) {
return ReactiveResourceUtil.readFileFromClasspath(path).flatMap(s -> {
if (hasText(s)) {
return Mono.just(Document.parse(s));
} else {
return Mono.just(Document.create());
}
});
} else {
LOGGER.info("path in {} has to be defined. Using default instead.", annotation);
}
return Mono.just(Document.create());
}
private Class<?> checkForBoundClass() {
if (boundClass == null) {
throw new InvalidDataAccessApiUsageException("IndexOperations are not bound");
}
return boundClass;
}
// endregion
}

View File

@ -146,7 +146,7 @@ class DefaultTransportIndexOperations extends AbstractDefaultIndexOperations imp
.getSettings(getSettingsRequest) //
.actionGet();
return convertSettingsResponseToMap(response, getSettingsRequest.indices()[0]);
return requestFactory.fromSettingsResponse(response, index.getIndexName());
}
@Override

View File

@ -23,7 +23,6 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.rest.RestStatus;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
@ -105,10 +104,15 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra
List<String> metadata = ex.getMetadata("es.index_uuid");
if (metadata == null) {
if (ex.getCause() instanceof ElasticsearchException) {
return indexAvailable((ElasticsearchException) ex.getCause());
}
if (ex instanceof ElasticsearchStatusException) {
return StringUtils.hasText(ObjectUtils.nullSafeToString(ex.getIndex()));
}
return false;
return true;
}
return !CollectionUtils.contains(metadata.iterator(), "_na_");
}

View File

@ -49,7 +49,7 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera
IndexOperations indexOps(Class<?> clazz);
/**
* get an {@link IndexOperations} that is bound to the given class
* get an {@link IndexOperations} that is bound to the given index
*
* @return IndexOperations
*/

View File

@ -85,6 +85,14 @@ public interface IndexOperations {
*/
Document createMapping(Class<?> clazz);
/**
* Writes the mapping to the index for the class this IndexOperations is bound to.
* @return {@literal true} if the mapping could be stored
* @since 4.1
*/
default boolean putMapping() {
return putMapping(createMapping());}
/**
* writes a mapping to the index
*
@ -141,9 +149,9 @@ public interface IndexOperations {
Map<String, Object> getSettings();
/**
* Get settings for a given indexName.
* Get the index settings.
*
* @param includeDefaults wehther or not to include all the default settings
* @param includeDefaults whether or not to include all the default settings
* @return the settings
*/
Map<String, Object> getSettings(boolean includeDefaults);

View File

@ -40,11 +40,21 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
* Execute within a {@link ClientCallback} managing resources and translating errors.
*
* @param callback must not be {@literal null}.
* @param <T>
* @param <T> the type the Publisher emits
* @return the {@link Publisher} emitting results.
*/
<T> Publisher<T> execute(ClientCallback<Publisher<T>> callback);
/**
* Execute within a {@link IndicesClientCallback} managing resources and translating errors.
*
* @param callback must not be {@literal null}.
* @param <T> the type the Publisher emits
* @return the {@link Publisher} emitting results.
* @since 4.1
*/
<T> Publisher<T> executeWithIndicesClient(IndicesClientCallback<Publisher<T>> callback);
/**
* Get the {@link ElasticsearchConverter} used.
*
@ -62,6 +72,22 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
*/
IndexCoordinates getIndexCoordinatesFor(Class<?> clazz);
/**
* Creates a {@link ReactiveIndexOperations} that is bound to the given index
* @param index IndexCoordinates specifying the index
* @return ReactiveIndexOperations implementation
* @since 4.1
*/
ReactiveIndexOperations indexOps(IndexCoordinates index);
/**
* Creates a {@link ReactiveIndexOperations} that is bound to the given class
* @param clazz the entity clazz specifiying the index information
* @return ReactiveIndexOperations implementation
* @since 4.1
*/
ReactiveIndexOperations indexOps(Class<?> clazz);
/**
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on
* {@link ReactiveElasticsearchClient}.
@ -74,4 +100,15 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
T doWithClient(ReactiveElasticsearchClient client);
}
/**
* Callback interface to be used with {@link #executeWithIndicesClient(IndicesClientCallback)} for operating directly on
* {@link ReactiveElasticsearchClient.Indices}.
*
* @param <T> the return type
* @since 4.1
*/
interface IndicesClientCallback<T extends Publisher<?>> {
T doWithClient(ReactiveElasticsearchClient.Indices client);
}
}

View File

@ -144,6 +144,32 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
}
/**
* Set the default {@link RefreshPolicy} to apply when writing to Elasticsearch.
*
* @param refreshPolicy can be {@literal null}.
*/
public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
}
/**
* @return the current {@link RefreshPolicy}.
*/
@Nullable
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
/**
* Set the default {@link IndicesOptions} for {@link SearchRequest search requests}.
*
* @param indicesOptions can be {@literal null}.
*/
public void setIndicesOptions(@Nullable IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
}
/**
* Set the {@link ReactiveEntityCallbacks} instance to use when invoking {@link ReactiveEntityCallbacks callbacks}
* like the {@link ReactiveBeforeConvertCallback}.
@ -160,7 +186,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
this.entityCallbacks = entityCallbacks;
}
// endregion
// region DocumentOperations
@ -199,25 +224,29 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(entitiesPublisher, "Entities must not be null!");
return entitiesPublisher.flatMapMany(entities -> {
return Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index));
}).collectList().map(Entities::new).flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index) //
.index().flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkItemResponse bulkItemResponse = indexAndResponse.getT2();
if (entities.isEmpty()) {
return Flux.empty();
}
AdaptibleEntity<T> adaptibleEntity = operations.forEntity(savedEntity, converter.getConversionService());
adaptibleEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index) //
.index().flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkItemResponse bulkItemResponse = indexAndResponse.getT2();
return maybeCallAfterSave(savedEntity, index);
});
});
AdaptibleEntity<T> adaptibleEntity = operations.forEntity(savedEntity,
converter.getConversionService());
adaptibleEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
return maybeCallAfterSave(savedEntity, index);
});
});
}
@Override
@ -730,44 +759,31 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
// endregion
// Property Setters / Getters
/**
* Set the default {@link RefreshPolicy} to apply when writing to Elasticsearch.
*
* @param refreshPolicy can be {@literal null}.
*/
public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
this.refreshPolicy = refreshPolicy;
}
/**
* Set the default {@link IndicesOptions} for {@link SearchRequest search requests}.
*
* @param indicesOptions can be {@literal null}.
*/
public void setIndicesOptions(@Nullable IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#exctute(ClientCallback)
*/
@Override
public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getClient())).onErrorMap(this::translateException);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#getElasticsearchConverter()
*/
@Override
public <T> Publisher<T> executeWithIndicesClient(IndicesClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getIndicesClient())).onErrorMap(this::translateException);
}
@Override
public ElasticsearchConverter getElasticsearchConverter() {
return converter;
}
@Override
public ReactiveIndexOperations indexOps(IndexCoordinates index) {
return new DefaultReactiveIndexOperations(this, index);
}
@Override
public ReactiveIndexOperations indexOps(Class<?> clazz) {
return new DefaultReactiveIndexOperations(this, clazz);
}
@Override
public IndexCoordinates getIndexCoordinatesFor(Class<?> clazz) {
return getPersistentEntityFor(clazz).getIndexCoordinates();
@ -788,6 +804,20 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return this.client;
}
/**
* Obtain the {@link ReactiveElasticsearchClient.Indices} to operate upon.
*
* @return never {@literal null}.
*/
protected ReactiveElasticsearchClient.Indices getIndicesClient() {
if (client instanceof ReactiveElasticsearchClient.Indices) {
return (ReactiveElasticsearchClient.Indices) client;
}
throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Indices implementation available");
}
// endregion
/**

View File

@ -0,0 +1,134 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import reactor.core.publisher.Mono;
import org.springframework.data.elasticsearch.core.document.Document;
/**
* Interface defining operations on indexes for the reactive stack.
*
* @author Peter-Josef Meisch
* @since 4.1
*/
public interface ReactiveIndexOperations {
/**
* Create an index.
*
* @return a {@link Mono} signalling successful operation completion or an {@link Mono#error(Throwable) error} if eg.
* the index already exist.
*/
Mono<Boolean> create();
/**
* Create an index with the specified settings.
*
* @param settings index settings
* @return a {@link Mono} signalling successful operation completion or an {@link Mono#error(Throwable) error} if eg.
* the index already exist.
*/
Mono<Boolean> create(Document settings);
/**
* Delete an index.
*
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error}. If the index does
* not exist, a value of {@literal false is emitted}.
*/
Mono<Boolean> delete();
/**
* checks if an index exists
*
* @return a {@link Mono} with the result of exist check
*/
Mono<Boolean> exists();
/**
* Refresh the index(es) this IndexOperations is bound to
*
* @return a {@link Mono} signalling operation completion.
*/
Mono<Void> refresh();
/**
* Creates the index mapping for the entity this IndexOperations is bound to.
*
* @return mapping object
*/
Mono<Document> createMapping();
/**
* Creates the index mapping for the given class
*
* @param clazz the clazz to create a mapping for
* @return a {@link Mono} with the mapping document
*/
Mono<Document> createMapping(Class<?> clazz);
/**
* Writes the mapping to the index for the class this IndexOperations is bound to.
*
* @return {@literal true} if the mapping could be stored
*/
default Mono<Boolean> putMapping() {
return putMapping(createMapping());
}
/**
* writes a mapping to the index
*
* @param mapping the Document with the mapping definitions
* @return {@literal true} if the mapping could be stored
*/
Mono<Boolean> putMapping(Mono<Document> mapping);
/**
* Creates the index mapping for the given class and writes it to the index.
*
* @param clazz the clazz to create a mapping for
* @return {@literal true} if the mapping could be stored
*/
default Mono<Boolean> putMapping(Class<?> clazz) {
return putMapping(createMapping(clazz));
}
/**
* Get mapping for the index targeted defined by this {@link ReactiveIndexOperations}
*
* @return the mapping
*/
Mono<Document> getMapping();
/**
* get the settings for the index
*
* @return a {@link Mono} with a {@link Document} containing the index settings
*/
default Mono<Document> getSettings() {
return getSettings(false);
}
/**
* get the settings for the index
*
* @param includeDefaults whether or not to include all the default settings
* @return a {@link Mono} with a {@link Document} containing the index settings
*/
Mono<Document> getSettings(boolean includeDefaults);
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import reactor.core.publisher.Mono;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
/**
* Utility to reactively read {@link org.springframework.core.io.Resource}s.
*
* @author Peter-Josef Meisch
* @since 4.1
*/
public abstract class ReactiveResourceUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveResourceUtil.class);
private static final int BUFFER_SIZE = 8_192;
/**
* Read a {@link ClassPathResource} into a {@link reactor.core.publisher.Mono<String>}.
*
* @param url the resource to read
* @return a {@link reactor.core.publisher.Mono} emitting the resources content or an empty Mono on error
*/
public static Mono<String> readFileFromClasspath(String url) {
return DataBufferUtils
.join(DataBufferUtils.read(new ClassPathResource(url), new DefaultDataBufferFactory(), BUFFER_SIZE))
.<String> handle((it, sink) -> {
try (InputStream is = it.asInputStream();
InputStreamReader in = new InputStreamReader(is, Charset.defaultCharset());
BufferedReader br = new BufferedReader(in)) {
StringBuilder sb = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
sink.next(sb.toString());
sink.complete();
} catch (Exception e) {
LOGGER.debug(String.format("Failed to load file from url: %s: %s", url, e.getMessage()));
sink.complete();
} finally {
DataBufferUtils.release(it);
}
}).onErrorResume(throwable -> {
LOGGER.debug(String.format("Failed to load file from url: %s: %s", url, throwable.getMessage()));
return Mono.empty();
});
}
// Utility constructor
private ReactiveResourceUtil() {}
}

View File

@ -29,8 +29,11 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.bulk.BulkRequest;
@ -52,9 +55,11 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.geo.GeoDistance;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
@ -246,11 +251,38 @@ class RequestFactory {
// endregion
// region index management
/**
* creates a CreateIndexRequest from the rest-high-level-client library.
*
* @param index name of the index
* @param settings optional settings
* @return request
*/
@SuppressWarnings("unchecked")
public CreateIndexRequest createIndexRequest(IndexCoordinates index, @Nullable Document settings) {
CreateIndexRequest request = new CreateIndexRequest(index.getIndexName());
if (settings != null) {
if (settings != null && !settings.isEmpty()) {
request.settings(settings);
}
return request;
}
/**
* creates a CreateIndexRequest from the elasticsearch library, used by the reactive methods.
*
* @param indexName name of the index
* @param settings optional settings
* @return request
*/
public org.elasticsearch.action.admin.indices.create.CreateIndexRequest createIndexRequestReactive(String indexName,
@Nullable Document settings) {
org.elasticsearch.action.admin.indices.create.CreateIndexRequest request = new org.elasticsearch.action.admin.indices.create.CreateIndexRequest(
indexName);
request.index(indexName);
if (settings != null && !settings.isEmpty()) {
request.settings(settings);
}
return request;
@ -268,10 +300,30 @@ class RequestFactory {
return createIndexRequestBuilder;
}
/**
* creates a GetIndexRequest from the rest-high-level-client library.
*
* @param index name of the index
* @return request
*/
public GetIndexRequest getIndexRequest(IndexCoordinates index) {
return new GetIndexRequest(index.getIndexNames());
}
/**
* creates a CreateIndexRequest from the elasticsearch library, used by the reactive methods.
*
* @param indexName name of the index
* @return request
*/
public org.elasticsearch.action.admin.indices.get.GetIndexRequest getIndexRequestReactive(String indexName) {
org.elasticsearch.action.admin.indices.get.GetIndexRequest request = new org.elasticsearch.action.admin.indices.get.GetIndexRequest();
request.indices(indexName);
return request;
}
public IndicesExistsRequest indicesExistsRequest(IndexCoordinates index) {
String[] indexNames = index.getIndexNames();
@ -303,6 +355,15 @@ class RequestFactory {
return request;
}
public org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest putMappingRequestReactive(
IndexCoordinates index, Document mapping) {
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest request = new org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest(
index.getIndexName());
request.type("not-used-but-must-be-there");
request.source(mapping);
return request;
}
public PutMappingRequestBuilder putMappingRequestBuilder(Client client, IndexCoordinates index, Document mapping) {
String[] indexNames = index.getIndexNames();
@ -312,6 +373,18 @@ class RequestFactory {
return requestBuilder;
}
public org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest getMappingRequestReactive(
IndexCoordinates index) {
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest request = new org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest();
request.indices(index.getIndexName());
return request;
}
public GetSettingsRequest getSettingsRequest(String indexName, boolean includeDefaults) {
return new GetSettingsRequest().indices(indexName).includeDefaults(includeDefaults);
}
public GetMappingsRequest getMappingsRequest(IndexCoordinates index) {
String[] indexNames = index.getIndexNames();
@ -337,11 +410,13 @@ class RequestFactory {
.setAbortOnVersionConflict(false) //
.setRefresh(true);
if (deleteQuery.getPageSize() != null)
if (deleteQuery.getPageSize() != null) {
deleteByQueryRequest.setBatchSize(deleteQuery.getPageSize());
}
if (deleteQuery.getScrollTimeInMillis() != null)
if (deleteQuery.getScrollTimeInMillis() != null) {
deleteByQueryRequest.setScroll(TimeValue.timeValueMillis(deleteQuery.getScrollTimeInMillis()));
}
return deleteByQueryRequest;
}
@ -388,8 +463,9 @@ class RequestFactory {
SearchRequestBuilder source = requestBuilder.source();
if (deleteQuery.getScrollTimeInMillis() != null)
if (deleteQuery.getScrollTimeInMillis() != null) {
source.setScroll(TimeValue.timeValueMillis(deleteQuery.getScrollTimeInMillis()));
}
return requestBuilder;
}
@ -721,7 +797,9 @@ class RequestFactory {
request.preference(query.getPreference());
}
request.searchType(query.getSearchType());
if (query.getSearchType() != null) {
request.searchType(query.getSearchType());
}
prepareSort(query, sourceBuilder, getPersistentEntity(clazz));
@ -1131,6 +1209,38 @@ class RequestFactory {
return elasticsearchFilter;
}
// region response stuff
/**
* extract the index settings information for a given index
*
* @param response the Elasticsearch response
* @param indexName the index name
* @return settings as {@link Document}
*/
public Document fromSettingsResponse(GetSettingsResponse response, String indexName) {
Document settings = Document.create();
if (!response.getIndexToDefaultSettings().isEmpty()) {
Settings defaultSettings = response.getIndexToDefaultSettings().get(indexName);
for (String key : defaultSettings.keySet()) {
settings.put(key, defaultSettings.get(key));
}
}
if (!response.getIndexToSettings().isEmpty()) {
Settings customSettings = response.getIndexToSettings().get(indexName);
for (String key : customSettings.keySet()) {
settings.put(key, customSettings.get(key));
}
}
return settings;
}
// endregion
// region helper functions
@Nullable
private ElasticsearchPersistentEntity<?> getPersistentEntity(@Nullable Class<?> clazz) {
return clazz != null ? elasticsearchConverter.getMappingContext().getPersistentEntity(clazz) : null;
@ -1162,6 +1272,7 @@ class RequestFactory {
String[] valuesAsArray = new String[values.size()];
return values.toArray(valuesAsArray);
}
// endregion
private boolean hasSeqNoPrimaryTermProperty(@Nullable Class<?> entityClass) {

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core.mapping;
import org.elasticsearch.index.VersionType;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.mapping.PersistentEntity;
import org.springframework.lang.Nullable;
@ -137,4 +138,12 @@ public interface ElasticsearchPersistentEntity<T> extends PersistentEntity<T, El
String.format("Required SeqNoPrimaryTerm property not found for %s!", this.getType()));
}
}
/**
* returns the default settings for an index.
*
* @return settings as {@link Document}
* @since 4.1
*/
Document getDefaultSettings();
}

View File

@ -19,12 +19,13 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.index.VersionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Parent;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.mapping.MappingException;
import org.springframework.data.mapping.PropertyHandler;
import org.springframework.data.mapping.model.BasicPersistentEntity;
@ -77,8 +78,9 @@ public class SimpleElasticsearchPersistentEntity<T> extends BasicPersistentEntit
super(typeInformation);
Class<T> clazz = typeInformation.getType();
if (clazz.isAnnotationPresent(Document.class)) {
Document document = clazz.getAnnotation(Document.class);
if (clazz.isAnnotationPresent(org.springframework.data.elasticsearch.annotations.Document.class)) {
org.springframework.data.elasticsearch.annotations.Document document = clazz
.getAnnotation(org.springframework.data.elasticsearch.annotations.Document.class);
Assert.hasText(document.indexName(),
" Unknown indexName. Make sure the indexName is defined. e.g @Document(indexName=\"foo\")");
this.indexName = document.indexName();
@ -317,4 +319,19 @@ public class SimpleElasticsearchPersistentEntity<T> extends BasicPersistentEntit
return resolvedName != null ? resolvedName : name;
}
// endregion
@Override
public Document getDefaultSettings() {
if (isUseServerConfiguration()) {
return Document.create();
}
Map<String, String> map = new MapBuilder<String, String>()
.put("index.number_of_shards", String.valueOf(getShards()))
.put("index.number_of_replicas", String.valueOf(getReplicas()))
.put("index.refresh_interval", getRefreshInterval()).put("index.store.type", getIndexStoreType()).map();
return Document.from(map);
}
}

View File

@ -85,25 +85,18 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
this.entityInformation = metadata;
this.entityClass = this.entityInformation.getJavaType();
this.indexOperations = operations.indexOps(this.entityClass);
try {
if (createIndexAndMapping() && !indexOperations.exists()) {
createIndex();
putMapping();
if (shouldCreateIndexAndMapping() && !indexOperations.exists()) {
indexOperations.create();
indexOperations.putMapping(entityClass);
}
} catch (Exception exception) {
LOGGER.warn("Cannot create index: {}", exception.getMessage());
}
}
private void createIndex() {
indexOperations.create();
}
private void putMapping() {
indexOperations.putMapping(entityClass);
}
private boolean createIndexAndMapping() {
private boolean shouldCreateIndexAndMapping() {
final ElasticsearchPersistentEntity<?> entity = operations.getElasticsearchConverter().getMappingContext()
.getRequiredPersistentEntity(entityClass);

View File

@ -18,12 +18,18 @@ package org.springframework.data.elasticsearch.repository.support;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.index.query.QueryBuilders;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.ReactiveIndexOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
@ -39,24 +45,53 @@ import org.springframework.util.Assert;
*/
public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveElasticsearchRepository<T, ID> {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleElasticsearchRepository.class);
private final ElasticsearchEntityInformation<T, ID> entityInformation;
private final ReactiveElasticsearchOperations elasticsearchOperations;
private final ReactiveElasticsearchOperations operations;
private final ReactiveIndexOperations indexOperations;
public SimpleReactiveElasticsearchRepository(ElasticsearchEntityInformation<T, ID> entityInformation,
ReactiveElasticsearchOperations elasticsearchOperations) {
ReactiveElasticsearchOperations operations) {
Assert.notNull(entityInformation, "EntityInformation must not be null!");
Assert.notNull(elasticsearchOperations, "ElasticsearchOperations must not be null!");
Assert.notNull(operations, "ElasticsearchOperations must not be null!");
this.entityInformation = entityInformation;
this.elasticsearchOperations = elasticsearchOperations;
this.operations = operations;
this.indexOperations = operations.indexOps(entityInformation.getJavaType());
createIndexAndMappingIfNeeded();
}
private void createIndexAndMappingIfNeeded() {
try {
if (shouldCreateIndexAndMapping()) {
indexOperations.exists() //
.flatMap(exists -> exists ? Mono.empty() : indexOperations.create()) //
.flatMap(success -> success ? indexOperations.putMapping() : Mono.empty()) //
.block();
}
} catch (Exception exception) {
LOGGER.warn("Cannot create index: {}", exception.getMessage());
}
}
private boolean shouldCreateIndexAndMapping() {
final ElasticsearchPersistentEntity<?> entity = operations.getElasticsearchConverter().getMappingContext()
.getRequiredPersistentEntity(entityInformation.getJavaType());
return entity.isCreateIndexAndMapping();
}
@Override
public <S extends T> Mono<S> save(S entity) {
Assert.notNull(entity, "Entity must not be null!");
return elasticsearchOperations.save(entity, entityInformation.getIndexCoordinates());
return operations.save(entity, entityInformation.getIndexCoordinates())
.flatMap(saved -> doRefresh().thenReturn(saved));
}
@Override
@ -71,16 +106,15 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
Assert.notNull(entityStream, "EntityStream must not be null!");
return elasticsearchOperations.saveAll(Flux.from(entityStream).collectList(),
entityInformation.getIndexCoordinates());
return operations.saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates())
.concatWith(doRefresh().then(Mono.empty()));
}
@Override
public Mono<T> findById(ID id) {
Assert.notNull(id, "Id must not be null!");
return elasticsearchOperations.get(convertId(id), entityInformation.getJavaType(),
entityInformation.getIndexCoordinates());
return operations.get(convertId(id), entityInformation.getJavaType(), entityInformation.getIndexCoordinates());
}
@Override
@ -94,8 +128,7 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
public Mono<Boolean> existsById(ID id) {
Assert.notNull(id, "Id must not be null!");
return elasticsearchOperations.exists(convertId(id), entityInformation.getJavaType(),
entityInformation.getIndexCoordinates());
return operations.exists(convertId(id), entityInformation.getIndexCoordinates());
}
@Override
@ -108,14 +141,14 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
@Override
public Flux<T> findAll() {
return elasticsearchOperations.search(Query.findAll().setPageable(Pageable.unpaged()),
entityInformation.getJavaType(), entityInformation.getIndexCoordinates()).map(SearchHit::getContent);
return operations.search(Query.findAll().setPageable(Pageable.unpaged()), entityInformation.getJavaType(),
entityInformation.getIndexCoordinates()).map(SearchHit::getContent);
}
@Override
public Flux<T> findAll(Sort sort) {
return elasticsearchOperations.search(Query.findAll().addSort(sort).setPageable(Pageable.unpaged()),
return operations.search(Query.findAll().addSort(sort).setPageable(Pageable.unpaged()),
entityInformation.getJavaType(), entityInformation.getIndexCoordinates()).map(SearchHit::getContent);
}
@ -138,24 +171,22 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
.flatMapMany(query -> {
IndexCoordinates index = entityInformation.getIndexCoordinates();
return elasticsearchOperations.multiGet(query, entityInformation.getJavaType(), index);
return operations.multiGet(query, entityInformation.getJavaType(), index);
});
}
@Override
public Mono<Long> count() {
return elasticsearchOperations.count(Query.findAll(), entityInformation.getJavaType(),
entityInformation.getIndexCoordinates());
return operations.count(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates());
}
@Override
public Mono<Void> deleteById(ID id) {
Assert.notNull(id, "Id must not be null!");
return elasticsearchOperations
.delete(convertId(id), entityInformation.getIndexCoordinates()) //
.then();
return operations.delete(convertId(id), entityInformation.getIndexCoordinates()) //
.then(doRefresh());
}
@Override
@ -169,8 +200,8 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
public Mono<Void> delete(T entity) {
Assert.notNull(entity, "Entity must not be null!");
return elasticsearchOperations.delete(entity, entityInformation.getIndexCoordinates()) //
.then();
return operations.delete(entity, entityInformation.getIndexCoordinates()) //
.then(doRefresh());
}
@Override
@ -192,29 +223,37 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
throw new IllegalStateException("Entity id must not be null!");
}
return convertId(id);
}).collectList().map(objects -> {
return new StringQuery(QueryBuilders.idsQuery() //
.addIds(objects.toArray(new String[0])) //
.toString());
}) //
.flatMap(query -> {
return elasticsearchOperations.delete(query, entityInformation.getJavaType(),
entityInformation.getIndexCoordinates());
}) //
.then();
}).collectList().map(objects -> new StringQuery(QueryBuilders.idsQuery() //
.addIds(objects.toArray(new String[0])) //
.toString())) //
.flatMap(
query -> operations.delete(query, entityInformation.getJavaType(), entityInformation.getIndexCoordinates())) //
.then(doRefresh());
}
@Override
public Mono<Void> deleteAll() {
return elasticsearchOperations
.delete(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates()) //
.then();
return operations.delete(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates()) //
.then(doRefresh());
}
private String convertId(Object id) {
return elasticsearchOperations.getElasticsearchConverter().convertId(id);
return operations.getElasticsearchConverter().convertId(id);
}
private Mono<Void> doRefresh() {
RefreshPolicy refreshPolicy = null;
if (operations instanceof ReactiveElasticsearchTemplate) {
refreshPolicy = ((ReactiveElasticsearchTemplate) operations).getRefreshPolicy();
}
if (refreshPolicy == null || refreshPolicy == RefreshPolicy.NONE) {
return indexOperations.refresh();
}
return Mono.empty();
}
}

View File

@ -497,11 +497,12 @@ public class ReactiveElasticsearchClientTests {
.verifyComplete();
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-678
public void createIndex() throws IOException {
client.indices().createIndex(request -> request.index(INDEX_I)) //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
assertThat(syncClient.indices().exists(new GetIndexRequest(INDEX_I), RequestOptions.DEFAULT)).isTrue();
@ -517,13 +518,14 @@ public class ReactiveElasticsearchClientTests {
.verifyError(ElasticsearchStatusException.class);
}
@Test // DATAES-569
@Test // DATAES-569, DATAES-678
public void deleteExistingIndex() throws IOException {
syncClient.indices().create(new CreateIndexRequest(INDEX_I), RequestOptions.DEFAULT);
client.indices().deleteIndex(request -> request.indices(INDEX_I)) //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
assertThat(syncClient.indices().exists(new GetIndexRequest(INDEX_I), RequestOptions.DEFAULT)).isFalse();
@ -601,6 +603,7 @@ public class ReactiveElasticsearchClientTests {
client.indices().updateMapping(request -> request.indices(INDEX_I).type(TYPE_I).source(jsonMap)) //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}

View File

@ -847,6 +847,7 @@ public class ReactiveElasticsearchTemplateTests {
.expectNext(entity1) //
.expectNext(entity2) //
.verifyComplete();
indexOperations.refresh();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
template.search(searchQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) //
@ -979,8 +980,6 @@ public class ReactiveElasticsearchTemplateTests {
}
}
// TODO: check field mapping !!!
// --> JUST some helpers
private SampleEntity randomEntity(String message) {

View File

@ -0,0 +1,358 @@
/*
* Copyright 2018-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.skyscreamer.jsonassert.JSONAssert.*;
import lombok.Data;
import reactor.test.StepVerifier;
import java.time.LocalDate;
import org.json.JSONException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { ReactiveIndexOperationsTest.Config.class })
public class ReactiveIndexOperationsTest {
public static final String TESTINDEX = "reactive-index-operations-testindex";
@Configuration
@Import({ ReactiveElasticsearchRestTemplateConfiguration.class, ElasticsearchRestTemplateConfiguration.class })
static class Config {}
@Autowired private ReactiveElasticsearchOperations operations;
@BeforeEach
void setUp() {
deleteIndices();
}
@AfterEach
void tearDown() {
deleteIndices();
}
private void deleteIndices() {
operations.indexOps(IndexCoordinates.of(TESTINDEX + "*")).delete().block();
}
@Test // DATAES-678
void shouldCreateIndexOpsForIndexCoordinates() {
IndexCoordinates indexCoordinates = IndexCoordinates.of(TESTINDEX);
ReactiveIndexOperations indexOperations = operations.indexOps(indexCoordinates);
assertThat(indexOperations).isNotNull();
}
@Test // DATAES-678
void shouldCreateIndexOpsForEntityClass() {
ReactiveIndexOperations indexOperations = operations.indexOps(Entity.class);
assertThat(indexOperations).isNotNull();
}
@Test // DATAES-678
void shouldCreateIndexForName() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-create"));
indexOps.create() //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-678
void shouldCreateIndexForEntity() {
ReactiveIndexOperations indexOps = operations.indexOps(Entity.class);
indexOps.create() //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
// check the settings from the class annotation
indexOps.getSettings().as(StepVerifier::create).consumeNextWith(settings -> {
assertThat(settings.get("index.number_of_replicas")).isEqualTo("2");
assertThat(settings.get("index.number_of_shards")).isEqualTo("3");
assertThat(settings.get("index.refresh_interval")).isEqualTo("4s");
}).verifyComplete();
}
@Test // DATAES-678
void shouldCreateIndexWithGivenSettings() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-create"));
org.springframework.data.elasticsearch.core.document.Document requiredSettings = org.springframework.data.elasticsearch.core.document.Document
.create();
requiredSettings.put("index.number_of_replicas", 3);
requiredSettings.put("index.number_of_shards", 4);
requiredSettings.put("index.refresh_interval", "5s");
indexOps.create(requiredSettings) //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
indexOps.getSettings().as(StepVerifier::create).consumeNextWith(settings -> {
assertThat(settings.get("index.number_of_replicas")).isEqualTo("3");
assertThat(settings.get("index.number_of_shards")).isEqualTo("4");
assertThat(settings.get("index.refresh_interval")).isEqualTo("5s");
}).verifyComplete();
}
@Test // DATAES-678
void shouldCreateIndexWithAnnotatedSettings() {
ReactiveIndexOperations indexOps = operations.indexOps(EntityWithAnnotatedSettingsAndMappings.class);
indexOps.create() //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
indexOps.getSettings().as(StepVerifier::create).consumeNextWith(settings -> {
assertThat(settings.get("index.number_of_replicas")).isEqualTo("0");
assertThat(settings.get("index.number_of_shards")).isEqualTo("1");
assertThat(settings.containsKey("index.analysis.analyzer.emailAnalyzer.tokenizer")).isTrue();
assertThat(settings.get("index.analysis.analyzer.emailAnalyzer.tokenizer")).isEqualTo("uax_url_email");
}).verifyComplete();
}
@Test // DATAES-678
public void shouldCreateIndexUsingServerDefaultConfiguration() {
ReactiveIndexOperations indexOps = operations.indexOps(EntityUseServerConfig.class);
indexOps.create() //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
// check the settings from the class annotation
indexOps.getSettings().as(StepVerifier::create).consumeNextWith(settings -> {
assertThat(settings.get("index.number_of_replicas")).isEqualTo("1");
assertThat(settings.get("index.number_of_shards")).isEqualTo("1");
}).verifyComplete();
}
@Test // DATAES-678
void shouldDeleteIfItExists() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-delete"));
indexOps.create().block();
indexOps.delete() //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-678
void shouldReturnFalseOnDeleteIfItDoesNotExist() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-delete"));
indexOps.delete() //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
@Test // DATAES-678
void shouldReturnExistsTrueIfIndexDoesExist() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-exists"));
indexOps.create().block();
indexOps.exists() //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-678
void shouldReturnExistsFalseIfIndexDoesNotExist() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-exists"));
indexOps.exists() //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
@Test // DATAES-678
void shouldCreateMappingForEntityFromProperties() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-mappings"));
String expected = "{\n" + //
" \"properties\":{\n" + //
" \"text\": {\n" + //
" \"type\": \"text\"\n" + //
" },\n" + //
" \"publication-date\": {\n" + //
" \"type\": \"date\",\n" + //
" \"format\": \"basic_date\"\n" + //
" }\n" + //
" }\n" + //
"}\n"; //
indexOps.createMapping(Entity.class) //
.as(StepVerifier::create) //
.assertNext(document -> {
try {
assertEquals(expected, document.toJson(), JSONCompareMode.NON_EXTENSIBLE);
} catch (JSONException e) {
fail("", e);
}
}) //
.verifyComplete();
}
@Test // DATAES-678
void shouldCreateMappingForEntityFromMappingAnnotation() {
ReactiveIndexOperations indexOps = operations.indexOps(IndexCoordinates.of(TESTINDEX + "-mappings"));
String expected = "{\n" + //
" \"properties\": {\n" + //
" \"email\": {\n" + //
" \"type\": \"text\",\n" + //
" \"analyzer\": \"emailAnalyzer\"\n" + //
" }\n" + //
" }\n" + //
"}\n"; //
indexOps.createMapping(EntityWithAnnotatedSettingsAndMappings.class) //
.as(StepVerifier::create) //
.assertNext(document -> {
try {
assertEquals(expected, document.toJson(), JSONCompareMode.NON_EXTENSIBLE);
} catch (JSONException e) {
fail("", e);
}
}) //
.verifyComplete();
}
@Test // DATAES-678
void shouldCreateMappingBoundEntity() {
ReactiveIndexOperations indexOps = operations.indexOps(Entity.class);
String expected = "{\n" + //
" \"properties\":{\n" + //
" \"text\": {\n" + //
" \"type\": \"text\"\n" + //
" },\n" + //
" \"publication-date\": {\n" + //
" \"type\": \"date\",\n" + //
" \"format\": \"basic_date\"\n" + //
" }\n" + //
" }\n" + //
"}\n"; //
indexOps.createMapping() //
.as(StepVerifier::create) //
.assertNext(document -> {
try {
assertEquals(expected, document.toJson(), JSONCompareMode.NON_EXTENSIBLE);
} catch (JSONException e) {
fail("", e);
}
}) //
.verifyComplete();
}
@Test // DATAES-678
void shouldPutAndGetMapping() {
ReactiveIndexOperations indexOps = operations.indexOps(Entity.class);
String expected = "{\n" + //
" \"properties\":{\n" + //
" \"text\": {\n" + //
" \"type\": \"text\"\n" + //
" },\n" + //
" \"publication-date\": {\n" + //
" \"type\": \"date\",\n" + //
" \"format\": \"basic_date\"\n" + //
" }\n" + //
" }\n" + //
"}\n"; //
indexOps.create() //
.then(indexOps.putMapping()) //
.then(indexOps.getMapping()) //
.as(StepVerifier::create) //
.assertNext(document -> {
try {
assertEquals(expected, document.toJson(), JSONCompareMode.NON_EXTENSIBLE);
} catch (JSONException e) {
fail("", e);
}
}).verifyComplete();
}
@Data
@Document(indexName = TESTINDEX, shards = 3, replicas = 2, refreshInterval = "4s")
static class Entity {
@Id private String id;
@Field(type = FieldType.Text) private String text;
@Field(name = "publication-date", type = FieldType.Date,
format = DateFormat.basic_date) private LocalDate publicationDate;
}
@Data
@Document(indexName = TESTINDEX, useServerConfiguration = true)
static class EntityUseServerConfig {
@Id private String id;
}
@Data
@Document(indexName = TESTINDEX)
@Setting(settingPath = "/settings/test-settings.json")
@Mapping(mappingPath = "/mappings/test-mappings.json")
static class EntityWithAnnotatedSettingsAndMappings {
@Id private String id;
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.skyscreamer.jsonassert.JSONAssert.*;
import reactor.test.StepVerifier;
import org.json.JSONException;
import org.junit.jupiter.api.Test;
/**
* @author Peter-Josef Meisch
*/
class ReactiveResourceUtilTest {
@Test
void shouldReadFromClasspath() {
String expected = "{\n" + //
" \"index\": {\n" + //
" \"number_of_shards\": \"1\",\n" + //
" \"number_of_replicas\": \"0\",\n" + //
" \"analysis\": {\n" + //
" \"analyzer\": {\n" + //
" \"emailAnalyzer\": {\n" + //
" \"type\": \"custom\",\n" + //
" \"tokenizer\": \"uax_url_email\"\n" + //
" }\n" + //
" }\n" + //
" }\n" + //
" }\n" + //
"}\n"; //
ReactiveResourceUtil.readFileFromClasspath("/settings/test-settings.json") //
.as(StepVerifier::create) //
.consumeNextWith(actual -> {
try {
assertEquals(expected, actual, false);
} catch (JSONException e) {
fail("", e);
}
}) //
.verifyComplete();
}
@Test
void shouldReturnEmptyMonoOnNonExistingResource() {
ReactiveResourceUtil.readFileFromClasspath("/this/should/really/not/exist") //
.as(StepVerifier::create) //
.verifyComplete();
}
}

View File

@ -48,7 +48,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -59,7 +58,6 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.Highlight;
@ -359,11 +357,11 @@ public class SimpleReactiveElasticsearchRepositoryTests {
repository.deleteById("does-not-exist").as(StepVerifier::create).verifyComplete();
}
@Test // DATAES-519, DATAES-767, DATAES-822
public void deleteByIdShouldErrorWhenIndexDoesNotExist() {
@Test // DATAES-519, DATAES-767, DATAES-822, DATAES-678
public void deleteByIdShouldCompleteWhenIndexDoesNotExist() {
repository.deleteById("does-not-exist") //
.as(StepVerifier::create) //
.verifyError(UncategorizedElasticsearchException.class);
.verifyComplete();
}
@Test // DATAES-519