From 90c4a2a4d6954cf2a50101a10e0db1f10ca93103 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Tue, 29 Dec 2020 13:39:28 +0100 Subject: [PATCH] DATAES-982 - Improve refresh handling. Original PR: #573. --- .../reference/elasticsearch-clients.adoc | 50 ++++++++---------- ...elasticsearch-migration-guide-4.1-4.2.adoc | 32 ++++++++++++ .../asciidoc/reference/migration-guides.adoc | 2 + .../AbstractElasticsearchConfiguration.java | 6 ++- ...actReactiveElasticsearchConfiguration.java | 6 +-- .../ElasticsearchConfigurationSupport.java | 19 +++++-- .../core/AbstractElasticsearchTemplate.java | 52 +++++++++++++++++++ .../core/ElasticsearchRestTemplate.java | 7 +-- .../core/ElasticsearchTemplate.java | 10 ++-- .../core/ReactiveElasticsearchTemplate.java | 3 +- .../elasticsearch/core/RefreshPolicy.java | 41 +++++++++++++++ .../elasticsearch/core/RequestFactory.java | 3 ++ .../elasticsearch/core/query/DeleteQuery.java | 5 +- .../elasticsearch/core/query/IndexQuery.java | 16 ++++++ .../core/query/IndexQueryBuilder.java | 16 ++---- .../SimpleElasticsearchRepository.java | 20 ++++++- ...SimpleReactiveElasticsearchRepository.java | 5 +- ...veElasticsearchClientIntegrationTests.java | 7 ++- ...ElasticsearchTemplateIntegrationTests.java | 16 ++---- ...eactiveElasticsearchTemplateUnitTests.java | 9 ++-- ...lasticsearchRestTemplateConfiguration.java | 6 +++ .../ElasticsearchTemplateConfiguration.java | 12 ++++- ...lasticsearchRestTemplateConfiguration.java | 6 +++ 23 files changed, 269 insertions(+), 80 deletions(-) create mode 100644 src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java diff --git a/src/main/asciidoc/reference/elasticsearch-clients.adoc b/src/main/asciidoc/reference/elasticsearch-clients.adoc index 67e9ac88e..f29266069 100644 --- a/src/main/asciidoc/reference/elasticsearch-clients.adoc +++ b/src/main/asciidoc/reference/elasticsearch-clients.adoc @@ -22,28 +22,32 @@ public class TransportClientConfig extends ElasticsearchConfigurationSupport { @Bean public Client elasticsearchClient() throws UnknownHostException { - Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); <1> + Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); <.> TransportClient client = new PreBuiltTransportClient(settings); - client.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); <2> + client.addTransportAddress(new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300)); <.> return client; } @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" }) public ElasticsearchTemplate elasticsearchTemplate() throws UnknownHostException { - return new ElasticsearchTemplate(elasticsearchClient()); + + ElasticsearchTemplate template = new ElasticsearchTemplate(elasticsearchClient, elasticsearchConverter); + template.setRefreshPolicy(refreshPolicy()); <.> + + return template; } } // ... IndexRequest request = new IndexRequest("spring-data", "elasticsearch", randomID()) - .source(someObject) - .setRefreshPolicy(IMMEDIATE); + .source(someObject); IndexResponse response = client.index(request); ---- -<1> The `TransportClient` must be configured with the cluster name. -<2> The host and port to connect the client to. +<.> The `TransportClient` must be configured with the cluster name. +<.> The host and port to connect the client to. +<.> the RefreshPolicy must be set in the `ElasticsearchTemplate` (override `refreshPolicy()` to not use the default) ==== [[elasticsearch.clients.rest]] @@ -103,39 +107,29 @@ Calls are directly operated on the reactive stack, **not** wrapping async (threa ==== [source,java] ---- -static class Config { +@Configuration +public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration { - @Bean - ReactiveElasticsearchClient client() { - - ClientConfiguration clientConfiguration = ClientConfiguration.builder() <1> - .connectedTo("localhost:9200", "localhost:9291") - .withWebClientConfigurer(webClient -> { <2> - ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() - .codecs(configurer -> configurer.defaultCodecs() - .maxInMemorySize(-1)) + @Override + @Bean + public ReactiveElasticsearchClient reactiveElasticsearchClient() { + final ClientConfiguration clientConfiguration = ClientConfiguration.builder() <.> + .connectedTo("localhost:9200") // .build(); - return webClient.mutate().exchangeStrategies(exchangeStrategies).build(); - }) - .build(); + return ReactiveRestClients.create(clientConfiguration); - return ReactiveRestClients.create(clientConfiguration); - } + } } - // ... Mono response = client.index(request -> request.index("spring-data") - .type("elasticsearch") .id(randomID()) - .source(singletonMap("feature", "reactive-client")) - .setRefreshPolicy(IMMEDIATE); + .source(singletonMap("feature", "reactive-client")); ); ---- -<1> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL. -<2> when configuring a reactive client, the `withWebClientConfigurer` hook can be used to customize the WebClient. +<.> Use the builder to provide cluster addresses, set default `HttpHeaders` or enable SSL. ==== NOTE: The ReactiveClient response, especially for search operations, is bound to the `from` (offset) & `size` (limit) options of the request. diff --git a/src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc b/src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc new file mode 100644 index 000000000..f3a683dbd --- /dev/null +++ b/src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc @@ -0,0 +1,32 @@ +[[elasticsearch-migration-guide-4.1-4.2]] += Upgrading from 4.1.x to 4.2.x + +This section describes breaking changes from version 4.1.x to 4.2.x and how removed features can be replaced by new introduced features. + +[[elasticsearch-migration-guide-4.1-4.2.deprecations]] +== Deprecations + +[[elasticsearch-migration-guide-4.1-4.2.removal]] +== Removals + +[[elasticsearch-migration-guide-4.1-4.2.breaking-changes]] +== Breaking Changes + +=== RefreshPolicy + +==== Enum package changed + +It was possible in 4.1 to configure the refresh policy for the `ReactiveElasticsearchTemplate` by overriding the method `AbstractReactiveElasticsearchConfiguration.refreshPolicy()` in a custom configuration class. The return value of this method was an instance of the class `org.elasticsearch.action.support.WriteRequest.RefreshPolicy`. + +Now the configuration must return `org.springframework.data.elasticsearch.core.RefreshPolicy`. This enum has the same values and triggers the same behaviour as before, so only the `import` statement has to be adjusted. + +==== Refresh behaviour + +`ElasticsearchOperations` and `ReactiveElasticsearchOperations` now explicitly use the `RefreshPolicy` set on the template for write requests if not null. If the refresh policy is null, then nothing special is done, so the cluster defaults are used. `ElasticsearchOperations` was always using the cluster default before this version. + +The provided implementations for `ElasticsearchRepository` and `ReactiveElasticsearchRepository` will do an explicit refresh when the refresh policy is null. This is the same behaviour as in previous versions. If a refresh policy is set, then it will be used by the repositories as well. + +==== Refresh configuration + +When configuring Spring Data Elasticsearch like described in <> by using `ElasticsearchConfigurationSupport`, `AbstractElasticsearchConfiguration` or `AbstractReactiveElasticsearchConfiguration` the refresh policy will be initialized to `null`. Previously the reactive code initialized this to `IMMEDIATE`, now reactive and +non-reactive code show the same behaviour. diff --git a/src/main/asciidoc/reference/migration-guides.adoc b/src/main/asciidoc/reference/migration-guides.adoc index eaf6630cb..ba5d1e0f7 100644 --- a/src/main/asciidoc/reference/migration-guides.adoc +++ b/src/main/asciidoc/reference/migration-guides.adoc @@ -6,4 +6,6 @@ include::elasticsearch-migration-guide-3.2-4.0.adoc[] include::elasticsearch-migration-guide-4.0-4.1.adoc[] + +include::elasticsearch-migration-guide-4.1-4.2.adoc[] :leveloffset: -1 diff --git a/src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java index 4f6702cdb..0d5bd9a2f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/config/AbstractElasticsearchConfiguration.java @@ -45,6 +45,10 @@ public abstract class AbstractElasticsearchConfiguration extends ElasticsearchCo @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" }) public ElasticsearchOperations elasticsearchOperations(ElasticsearchConverter elasticsearchConverter, RestHighLevelClient elasticsearchClient) { - return new ElasticsearchRestTemplate(elasticsearchClient, elasticsearchConverter); + + ElasticsearchRestTemplate template = new ElasticsearchRestTemplate(elasticsearchClient, elasticsearchConverter); + template.setRefreshPolicy(refreshPolicy()); + + return template; } } diff --git a/src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java b/src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java index 7efba437a..e99ef79b7 100644 --- a/src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java +++ b/src/main/java/org/springframework/data/elasticsearch/config/AbstractReactiveElasticsearchConfiguration.java @@ -16,11 +16,11 @@ package org.springframework.data.elasticsearch.config; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.springframework.context.annotation.Bean; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.lang.Nullable; @@ -58,13 +58,13 @@ public abstract class AbstractReactiveElasticsearchConfiguration extends Elastic } /** - * Set up the write {@link RefreshPolicy}. Default is set to {@link RefreshPolicy#IMMEDIATE}. + * Set up the write {@link RefreshPolicy}. Default is set to null to use the cluster defaults.. * * @return {@literal null} to use the server defaults. */ @Nullable protected RefreshPolicy refreshPolicy() { - return RefreshPolicy.IMMEDIATE; + return null; } /** diff --git a/src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java b/src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java index a56ffe0d1..7d1c42c65 100644 --- a/src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java +++ b/src/main/java/org/springframework/data/elasticsearch/config/ElasticsearchConfigurationSupport.java @@ -28,10 +28,12 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.core.type.filter.AnnotationTypeFilter; import org.springframework.data.annotation.Persistent; import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; +import org.springframework.lang.Nullable; import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; @@ -44,8 +46,8 @@ import org.springframework.util.StringUtils; public class ElasticsearchConfigurationSupport { @Bean - public ElasticsearchConverter elasticsearchEntityMapper( - SimpleElasticsearchMappingContext elasticsearchMappingContext, ElasticsearchCustomConversions elasticsearchCustomConversions) { + public ElasticsearchConverter elasticsearchEntityMapper(SimpleElasticsearchMappingContext elasticsearchMappingContext, + ElasticsearchCustomConversions elasticsearchCustomConversions) { MappingElasticsearchConverter elasticsearchConverter = new MappingElasticsearchConverter( elasticsearchMappingContext); @@ -61,7 +63,8 @@ public class ElasticsearchConfigurationSupport { * @return never {@literal null}. */ @Bean - public SimpleElasticsearchMappingContext elasticsearchMappingContext(ElasticsearchCustomConversions elasticsearchCustomConversions) { + public SimpleElasticsearchMappingContext elasticsearchMappingContext( + ElasticsearchCustomConversions elasticsearchCustomConversions) { SimpleElasticsearchMappingContext mappingContext = new SimpleElasticsearchMappingContext(); mappingContext.setInitialEntitySet(getInitialEntitySet()); @@ -147,4 +150,14 @@ public class ElasticsearchConfigurationSupport { return initialEntitySet; } + + /** + * Set up the write {@link RefreshPolicy}. Default is set to null to use the cluster defaults.. + * + * @return {@literal null} to use the server defaults. + */ + @Nullable + protected RefreshPolicy refreshPolicy() { + return null; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index 4f9dae403..a9c07e915 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -31,6 +31,8 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.WriteRequestBuilder; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.MoreLikeThisQueryBuilder; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -82,6 +84,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper @Nullable protected RequestFactory requestFactory; @Nullable private EntityOperations entityOperations; @Nullable private EntityCallbacks entityCallbacks; + @Nullable private RefreshPolicy refreshPolicy; // region Initialization protected void initialize(ElasticsearchConverter elasticsearchConverter) { @@ -130,6 +133,15 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper this.entityCallbacks = entityCallbacks; } + + public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + } + + @Nullable + public RefreshPolicy getRefreshPolicy() { + return refreshPolicy; + } // endregion // region DocumentOperations @@ -308,6 +320,41 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper public abstract List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index); + + /** + * Pre process the write request before it is sent to the server, eg. by setting the + * {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable. + * + * @param request must not be {@literal null}. + * @param + * @return the processed {@link WriteRequest}. + */ + protected > R prepareWriteRequest(R request) { + + if (refreshPolicy == null) { + return request; + } + + return request.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy()); + } + + /** + * Pre process the write request before it is sent to the server, eg. by setting the + * {@link WriteRequest#setRefreshPolicy(String) refresh policy} if applicable. + * + * @param requestBuilder must not be {@literal null}. + * @param + * @return the processed {@link WriteRequest}. + */ + protected > R prepareWriteRequestBuilder(R requestBuilder) { + + if (refreshPolicy == null) { + return requestBuilder; + } + + return requestBuilder.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy()); + } + // endregion // region SearchOperations @@ -609,6 +656,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper } private IndexQuery getIndexQuery(T entity) { + String id = getEntityId(entity); if (id != null) { @@ -618,7 +666,9 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper IndexQueryBuilder builder = new IndexQueryBuilder() // .withId(id) // .withObject(entity); + SeqNoPrimaryTerm seqNoPrimaryTerm = getEntitySeqNoPrimaryTerm(entity); + if (seqNoPrimaryTerm != null) { builder.withSeqNoPrimaryTerm(seqNoPrimaryTerm); } else { @@ -627,9 +677,11 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper } String routing = getEntityRouting(entity); + if (routing != null) { builder.withRouting(routing); } + return builder.build(); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index b679ef14e..1a6c94990 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -139,7 +139,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { // region DocumentOperations public String doIndex(IndexQuery query, IndexCoordinates index) { - IndexRequest request = requestFactory.indexRequest(query, index); + IndexRequest request = prepareWriteRequest(requestFactory.indexRequest(query, index)); IndexResponse indexResponse = execute(client -> client.index(request, RequestOptions.DEFAULT)); // We should call this because we are not going through a mapper. @@ -197,7 +197,8 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { Assert.notNull(id, "id must not be null"); Assert.notNull(index, "index must not be null"); - DeleteRequest request = requestFactory.deleteRequest(elasticsearchConverter.convertId(id), routing, index); + DeleteRequest request = prepareWriteRequest( + requestFactory.deleteRequest(elasticsearchConverter.convertId(id), routing, index)); return execute(client -> client.delete(request, RequestOptions.DEFAULT).getId()); } @@ -224,7 +225,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { public List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { - BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index); + BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index)); List indexedObjectInformationList = checkForBulkOperationFailure( execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT))); updateIndexedObjectsWithQueries(queries, indexedObjectInformationList); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index c734b7c69..f63e12927 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -147,6 +147,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { public String doIndex(IndexQuery query, IndexCoordinates index) { IndexRequestBuilder indexRequestBuilder = requestFactory.indexRequestBuilder(client, query, index); + indexRequestBuilder = prepareWriteRequestBuilder(indexRequestBuilder); ActionFuture future = indexRequestBuilder.execute(); IndexResponse response; try { @@ -211,8 +212,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { Assert.notNull(id, "id must not be null"); Assert.notNull(index, "index must not be null"); - DeleteRequestBuilder deleteRequestBuilder = requestFactory.deleteRequestBuilder(client, - elasticsearchConverter.convertId(id), routing, index); + DeleteRequestBuilder deleteRequestBuilder = prepareWriteRequestBuilder( + requestFactory.deleteRequestBuilder(client, elasticsearchConverter.convertId(id), routing, index)); return deleteRequestBuilder.execute().actionGet().getId(); } @@ -242,9 +243,10 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate { public List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { - BulkRequestBuilder bulkRequest = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index); + BulkRequestBuilder bulkRequestBuilder = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index); + bulkRequestBuilder = prepareWriteRequestBuilder(bulkRequestBuilder); final List indexedObjectInformations = checkForBulkOperationFailure( - bulkRequest.execute().actionGet()); + bulkRequestBuilder.execute().actionGet()); updateIndexedObjectsWithQueries(queries, indexedObjectInformations); return indexedObjectInformations; } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 020723562..e3cb59e87 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -37,7 +37,6 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -648,7 +647,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera return request; } - return request.setRefreshPolicy(refreshPolicy); + return request.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy()); } // endregion diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java b/src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java new file mode 100644 index 000000000..7c768bbd8 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/RefreshPolicy.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import org.elasticsearch.action.support.WriteRequest; + +/** + * Enum mirroring org.elasticsearch.action.support.WriteRequest.RefreshPolicy to keep Elasticsearch classes out of our + * API. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ +public enum RefreshPolicy { + NONE, IMMEDIATE, WAIT_UNTIL; + + WriteRequest.RefreshPolicy toRequestRefreshPolicy() { + switch (this) { + case IMMEDIATE: + return WriteRequest.RefreshPolicy.IMMEDIATE; + case WAIT_UNTIL: + return WriteRequest.RefreshPolicy.WAIT_UNTIL; + case NONE: + default: + return WriteRequest.RefreshPolicy.NONE; + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 8f917edd4..9efac343a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -958,6 +958,7 @@ class RequestFactory { throw new ElasticsearchException( "object or source is null, failed to index the document [id: " + query.getId() + ']'); } + if (query.getVersion() != null) { indexRequestBuilder.setVersion(query.getVersion()); VersionType versionType = retrieveVersionTypeFromPersistentEntity(query.getObject().getClass()); @@ -967,9 +968,11 @@ class RequestFactory { if (query.getSeqNo() != null) { indexRequestBuilder.setIfSeqNo(query.getSeqNo()); } + if (query.getPrimaryTerm() != null) { indexRequestBuilder.setIfPrimaryTerm(query.getPrimaryTerm()); } + if (query.getRouting() != null) { indexRequestBuilder.setRouting(query.getRouting()); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java index 1dc5179ad..7bd1f5767 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/DeleteQuery.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.core.query; +import java.time.Duration; + import org.elasticsearch.index.query.QueryBuilder; import org.springframework.lang.Nullable; @@ -24,7 +26,8 @@ import org.springframework.lang.Nullable; * @author Rizwan Idrees * @author Mohsin Husen * @author Peter-Josef Meisch - * @deprecated since 4.0, use {@link Query} implementations and set {@link Query#setScrollTimeInMillis(Long)} and {@link Query#getMaxResults()} + * @deprecated since 4.0, use {@link Query} implementations and set {@link Query#setScrollTime(Duration)} and + * {@link Query#getMaxResults()} */ @Deprecated public class DeleteQuery { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQuery.java index f40235a68..f56e7ebeb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQuery.java @@ -38,6 +38,22 @@ public class IndexQuery { @Nullable private String routing; @Nullable private OpType opType; + public IndexQuery() {} + + public IndexQuery(@Nullable String id, @Nullable Object object, @Nullable Long version, @Nullable String source, + @Nullable String parentId, @Nullable Long seqNo, @Nullable Long primaryTerm, @Nullable String routing, + @Nullable OpType opType) { + this.id = id; + this.object = object; + this.version = version; + this.source = source; + this.parentId = parentId; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + this.routing = routing; + this.opType = opType; + } + @Nullable public String getId() { return id; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQueryBuilder.java b/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQueryBuilder.java index a35c8211e..ffc7dd9a3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQueryBuilder.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/IndexQueryBuilder.java @@ -15,6 +15,7 @@ */ package org.springframework.data.elasticsearch.core.query; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.lang.Nullable; /** @@ -37,6 +38,9 @@ public class IndexQueryBuilder { @Nullable private Long primaryTerm; @Nullable private String routing; @Nullable private IndexQuery.OpType opType; + @Nullable private RefreshPolicy refreshPolicy; + + public IndexQueryBuilder() {} public IndexQueryBuilder withId(String id) { this.id = id; @@ -84,16 +88,6 @@ public class IndexQueryBuilder { } public IndexQuery build() { - IndexQuery indexQuery = new IndexQuery(); - indexQuery.setId(id); - indexQuery.setObject(object); - indexQuery.setParentId(parentId); - indexQuery.setSource(source); - indexQuery.setVersion(version); - indexQuery.setSeqNo(seqNo); - indexQuery.setPrimaryTerm(primaryTerm); - indexQuery.setRouting(routing); - indexQuery.setOpType(opType); - return indexQuery; + return new IndexQuery(id, object, version, source, parentId, seqNo, primaryTerm, routing, opType); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java index 69d88a34a..f456b65f4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java @@ -27,14 +27,15 @@ import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.core.AbstractElasticsearchTemplate; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.IndexOperations; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchHitSupport; import org.springframework.data.elasticsearch.core.SearchHits; @@ -346,10 +347,24 @@ public class SimpleElasticsearchRepository implements ElasticsearchReposi } @Override + @Deprecated public void refresh() { indexOperations.refresh(); } + private void doRefresh() { + RefreshPolicy refreshPolicy = null; + + if (operations instanceof AbstractElasticsearchTemplate) { + refreshPolicy = ((AbstractElasticsearchTemplate) operations).getRefreshPolicy(); + } + + if (refreshPolicy == null) { + indexOperations.refresh(); + } + } + + // region helper functions @Nullable protected ID extractIdFromBean(T entity) { return entityInformation.getId(entity); @@ -376,6 +391,7 @@ public class SimpleElasticsearchRepository implements ElasticsearchReposi return new NativeSearchQueryBuilder().withIds(stringIds).build(); } + // endregion // region operations callback @FunctionalInterface @@ -392,7 +408,7 @@ public class SimpleElasticsearchRepository implements ElasticsearchReposi @Nullable public R executeAndRefresh(OperationsCallback callback) { R result = callback.doWithOperations(operations); - refresh(); + doRefresh(); return result; } // endregion diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index aa7b4c3b3..0bdf58436 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -16,10 +16,10 @@ package org.springframework.data.elasticsearch.repository.support; import org.elasticsearch.index.query.IdsQueryBuilder; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -259,11 +259,10 @@ public class SimpleReactiveElasticsearchRepository implements ReactiveEla refreshPolicy = ((ReactiveElasticsearchTemplate) operations).getRefreshPolicy(); } - if (refreshPolicy == null || refreshPolicy == RefreshPolicy.NONE) { + if (refreshPolicy == null) { return indexOperations.refresh(); } return Mono.empty(); } - } diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java index 908f2a51a..cbb8e399d 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java @@ -37,7 +37,6 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.get.GetResult; @@ -86,6 +85,11 @@ public class ReactiveElasticsearchClientIntegrationTests { public ReactiveElasticsearchClient reactiveElasticsearchClient() { return super.reactiveElasticsearchClient(); } + + @Override + protected org.springframework.data.elasticsearch.core.RefreshPolicy refreshPolicy() { + return org.springframework.data.elasticsearch.core.RefreshPolicy.IMMEDIATE; + } } static final String INDEX_I = "idx-1-reactive-client-tests"; @@ -745,7 +749,6 @@ public class ReactiveElasticsearchClientIntegrationTests { return new IndexRequest(ReactiveElasticsearchClientIntegrationTests.INDEX_I) // .id(UUID.randomUUID().toString()) // .source(ReactiveElasticsearchClientIntegrationTests.DOC_SOURCE) // - .setRefreshPolicy(RefreshPolicy.IMMEDIATE) // .create(true); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index 9e276d752..343f80a6e 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -153,12 +153,9 @@ public class ReactiveElasticsearchTemplateIntegrationTests { SampleEntity sampleEntity = randomEntity("foo bar"); - template.save(sampleEntity)// - .as(StepVerifier::create)// - .expectNextCount(1)// - .verifyComplete(); - - indexOperations.refresh(); + template.save(sampleEntity) // + .then(indexOperations.refresh()) // + .block(); template .search(new CriteriaQuery(Criteria.where("message").is(sampleEntity.getMessage())), SampleEntity.class, @@ -842,11 +839,8 @@ public class ReactiveElasticsearchTemplateIntegrationTests { entity2.rate = 2; template.saveAll(Mono.just(Arrays.asList(entity1, entity2)), IndexCoordinates.of(DEFAULT_INDEX)) // - .as(StepVerifier::create) // - .expectNext(entity1) // - .expectNext(entity2) // - .verifyComplete(); - indexOperations.refresh(); + .then(indexOperations.refresh()) // + .block(); NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); template.search(searchQuery, SampleEntity.class, IndexCoordinates.of(DEFAULT_INDEX)) // diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java index 6a9c5a54c..76da6ae5f 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateUnitTests.java @@ -38,7 +38,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.main.MainResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -94,7 +93,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE.toRequestRefreshPolicy()); } @Test // DATAES-504 @@ -109,7 +108,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL.toRequestRefreshPolicy()); } @Test // DATAES-504, DATAES-518 @@ -179,7 +178,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.IMMEDIATE.toRequestRefreshPolicy()); } @Test // DATAES-504 @@ -194,7 +193,7 @@ public class ReactiveElasticsearchTemplateUnitTests { .as(StepVerifier::create) // .verifyComplete(); - assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL); + assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(RefreshPolicy.WAIT_UNTIL.toRequestRefreshPolicy()); } @Test // DATAES-504 diff --git a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchRestTemplateConfiguration.java b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchRestTemplateConfiguration.java index 16de3a40c..2d27003b8 100644 --- a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchRestTemplateConfiguration.java +++ b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchRestTemplateConfiguration.java @@ -27,6 +27,7 @@ import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; /** @@ -84,4 +85,9 @@ public class ElasticsearchRestTemplateConfiguration extends AbstractElasticsearc } }; } + + @Override + protected RefreshPolicy refreshPolicy() { + return RefreshPolicy.IMMEDIATE; + } } diff --git a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchTemplateConfiguration.java b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchTemplateConfiguration.java index 03a42f2cf..151fbdb8a 100644 --- a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchTemplateConfiguration.java +++ b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ElasticsearchTemplateConfiguration.java @@ -27,6 +27,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport; import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; /** @@ -52,6 +53,15 @@ public class ElasticsearchTemplateConfiguration extends ElasticsearchConfigurati @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" }) public ElasticsearchTemplate elasticsearchTemplate(Client elasticsearchClient, ElasticsearchConverter elasticsearchConverter) { - return new ElasticsearchTemplate(elasticsearchClient, elasticsearchConverter); + + ElasticsearchTemplate template = new ElasticsearchTemplate(elasticsearchClient, elasticsearchConverter); + template.setRefreshPolicy(refreshPolicy()); + + return template; + } + + @Override + protected RefreshPolicy refreshPolicy() { + return RefreshPolicy.IMMEDIATE; } } diff --git a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ReactiveElasticsearchRestTemplateConfiguration.java b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ReactiveElasticsearchRestTemplateConfiguration.java index a1761d125..15e456615 100644 --- a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ReactiveElasticsearchRestTemplateConfiguration.java +++ b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ReactiveElasticsearchRestTemplateConfiguration.java @@ -23,6 +23,7 @@ import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients; import org.springframework.data.elasticsearch.config.AbstractReactiveElasticsearchConfiguration; +import org.springframework.data.elasticsearch.core.RefreshPolicy; /** * Configuration for Spring Data Elasticsearch Integration Tests using @@ -52,4 +53,9 @@ public class ReactiveElasticsearchRestTemplateConfiguration extends AbstractReac .withSocketTimeout(Duration.ofSeconds(20)) // .build()); } + + @Override + protected RefreshPolicy refreshPolicy() { + return RefreshPolicy.IMMEDIATE; + } }