From 18be55874054fd28d53717a8b69a0d69287efcba Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 20 Aug 2022 18:23:50 +0200 Subject: [PATCH] Reactive implementation of the point in time API. This PR adds the reactive implementation for the point in time API that was missing in #2273. Original Pull Request #2275 Closes #2274 --- .../elc/ReactiveElasticsearchClient.java | 42 +++++++ .../elc/ReactiveElasticsearchTemplate.java | 25 ++++ ...AbstractReactiveElasticsearchTemplate.java | 13 ++ .../core/ReactiveSearchHits.java | 10 ++ .../core/ReactiveSearchHitsImpl.java | 9 ++ .../core/ReactiveSearchOperations.java | 33 +++++ ...eactivePointInTimeELCIntegrationTests.java | 40 +++++++ ...ctivePointInTimeERHLCIntegrationTests.java | 47 ++++++++ .../ReactivePointInTimeIntegrationTests.java | 113 ++++++++++++++++++ 9 files changed, 332 insertions(+) create mode 100644 src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeELCIntegrationTests.java create mode 100644 src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeERHLCIntegrationTests.java create mode 100644 src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeIntegrationTests.java diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchClient.java index 6b0782c0e..ebfb10fae 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchClient.java @@ -276,6 +276,48 @@ public class ReactiveElasticsearchClient extends ApiClient openPointInTime(OpenPointInTimeRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, OpenPointInTimeRequest._ENDPOINT, transportOptions)); + } + + /** + * @since 5.0 + */ + public Mono openPointInTime( + Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return openPointInTime(fn.apply(new OpenPointInTimeRequest.Builder()).build()); + } + + /** + * @since 5.0 + */ + public Mono closePointInTime(ClosePointInTimeRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, ClosePointInTimeRequest._ENDPOINT, transportOptions)); + } + + /** + * @since 5.0 + */ + public Mono closePointInTime( + Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return closePointInTime(fn.apply(new ClosePointInTimeRequest.Builder()).build()); + } // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java index 95f5904be..d8be499d4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java @@ -30,6 +30,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -408,6 +409,30 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch }); } + @Override + public Mono openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { + + Assert.notNull(index, "index must not be null"); + Assert.notNull(keepAlive, "keepAlive must not be null"); + Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null"); + + var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable); + return Mono + .from(execute((ClientCallback>) client -> client.openPointInTime(request))) + .map(OpenPointInTimeResponse::id); + } + + @Override + public Mono closePointInTime(String pit) { + + Assert.notNull(pit, "pit must not be null"); + + ClosePointInTimeRequest request = requestConverter.searchClosePointInTime(pit); + return Mono + .from(execute((ClientCallback>) client -> client.closePointInTime(request))) + .map(ClosePointInTimeResponse::succeeded); + } + // endregion @Override diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index f919d6893..6c5f399f6 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -19,6 +19,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -29,6 +30,7 @@ import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.convert.EntityReader; +import org.springframework.data.elasticsearch.client.UnsupportedClientOperationException; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.Document; @@ -476,6 +478,17 @@ abstract public class AbstractReactiveElasticsearchTemplate } abstract protected Mono doCount(Query query, Class entityType, IndexCoordinates index); + + @Override + public Mono openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { + throw new UnsupportedClientOperationException(getClass(), "openPointInTime"); + } + + @Override + public Mono closePointInTime(String pit) { + throw new UnsupportedClientOperationException(getClass(), "closePointInTime"); + } + // endregion // region callbacks diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java index 6c387c81a..885749bfa 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java @@ -72,4 +72,14 @@ public interface ReactiveSearchHits { * @return wether the {@link SearchHits} has a suggest response. */ boolean hasSuggest(); + + /** + * When doing a search with a point in time, the response contains a new point in time id value. + * + * @return the new point in time id, if one was returned from Elasticsearch + * @since 5.0 + */ + @Nullable + String getPointInTimeId(); + } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java index 2e05d734e..75b305c95 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java @@ -78,4 +78,13 @@ public class ReactiveSearchHitsImpl implements ReactiveSearchHits { public boolean hasSuggest() { return delegate.hasSuggest(); } + + /** + * @since 5.0 + */ + @Nullable + @Override + public String getPointInTimeId() { + return delegate.getPointInTimeId(); + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java index 2680abb51..e61c33040 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java @@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.core; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Duration; import java.util.List; import org.springframework.data.domain.Pageable; @@ -271,6 +272,38 @@ public interface ReactiveSearchOperations { */ Mono suggest(Query query, Class entityType, IndexCoordinates index); + /** + * Opens a point in time (pit) in Elasticsearch. + * + * @param index the index name(s) to use + * @param keepAlive the duration the pit shoult be kept alive + * @return the pit identifier + * @since 5.0 + */ + default Mono openPointInTime(IndexCoordinates index, Duration keepAlive) { + return openPointInTime(index, keepAlive, false); + } + + /** + * Opens a point in time (pit) in Elasticsearch. + * + * @param index the index name(s) to use + * @param keepAlive the duration the pit shoult be kept alive + * @param ignoreUnavailable if {$literal true} the call will fail if any of the indices is missing or closed + * @return the pit identifier + * @since 5.0 + */ + Mono openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable); + + /** + * Closes a point in time + * + * @param pit the pit identifier as returned by {@link #openPointInTime(IndexCoordinates, Duration, Boolean)} + * @return {@literal true} on success + * @since 5.0 + */ + Mono closePointInTime(String pit); + // region helper /** * Creates a {@link Query} to find all documents. Must be implemented by the concrete implementations to provide an diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeELCIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeELCIntegrationTests.java new file mode 100644 index 000000000..fa46aacc0 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeELCIntegrationTests.java @@ -0,0 +1,40 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Peter-Josef Meisch + * @since 5.0 + */ +@ContextConfiguration(classes = ReactivePointInTimeELCIntegrationTests.Config.class) +public class ReactivePointInTimeELCIntegrationTests extends ReactivePointInTimeIntegrationTests { + + @Configuration + @Import({ ReactiveElasticsearchTemplateConfiguration.class }) + static class Config { + @Bean + IndexNameProvider indexNameProvider() { + return new IndexNameProvider("reactive-point-in-time"); + } + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeERHLCIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeERHLCIntegrationTests.java new file mode 100644 index 000000000..bcc4d34ac --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeERHLCIntegrationTests.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import org.junit.jupiter.api.Disabled; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.test.context.ContextConfiguration; + +/** + * This test class is disabled on purpose. PIT will be introduced in Spring Data Elasticsearch 5.0 where the old + * RestHighLevelClient and the {@link org.springframework.data.elasticsearch.client.erhlc.ElasticsearchRestTemplate} are + * deprecated. We therefore do not add new features to this implementation anymore. Furthermore we cannot copy the + * necessary code for the reactive implementation like we did before, as point in time was introduced in Elasticsearch + * 7.12 after the license change. + * + * @author Peter-Josef Meisch + */ +@Disabled +@ContextConfiguration(classes = ReactivePointInTimeERHLCIntegrationTests.Config.class) +public class ReactivePointInTimeERHLCIntegrationTests extends ReactivePointInTimeIntegrationTests { + + @Configuration + @Import({ ReactiveElasticsearchRestTemplateConfiguration.class }) + static class Config { + @Bean + IndexNameProvider indexNameProvider() { + return new IndexNameProvider("reactive-point-in-time-es7"); + } + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeIntegrationTests.java new file mode 100644 index 000000000..f10181b4a --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeIntegrationTests.java @@ -0,0 +1,113 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import static org.assertj.core.api.Assertions.*; + +import java.time.Duration; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.annotation.Id; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.annotations.Field; +import org.springframework.data.elasticsearch.annotations.FieldType; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.Criteria; +import org.springframework.data.elasticsearch.core.query.CriteriaQuery; +import org.springframework.data.elasticsearch.core.query.CriteriaQueryBuilder; +import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; +import org.springframework.data.elasticsearch.utils.IndexNameProvider; +import org.springframework.lang.Nullable; +import org.springframework.util.StringUtils; + +/** + * Integration tests for the point in time API. + * + * @author Peter-Josef Meisch + */ +@SpringIntegrationTest +public abstract class ReactivePointInTimeIntegrationTests { + + @Autowired ReactiveElasticsearchOperations operations; + @Autowired IndexNameProvider indexNameProvider; + @Nullable ReactiveIndexOperations indexOperations; + + @BeforeEach + void setUp() { + indexNameProvider.increment(); + indexOperations = operations.indexOps(SampleEntity.class); + indexOperations.createWithMapping().block(); + } + + @Test + @Order(Integer.MAX_VALUE) + void cleanup() { + operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + '*')).delete().block(); + } + + @Test // #1684 + @DisplayName("should create pit search with it and delete it again") + void shouldCreatePitSearchWithItAndDeleteItAgain() { + + // insert 2 records, one smith + List eList = List.of(new SampleEntity("1", "John", "Smith"), new SampleEntity("2", "Mike", "Cutter")); + operations.saveAll(eList, SampleEntity.class).blockLast(); + + // seach for smith + var searchQuery = new CriteriaQuery(Criteria.where("lastName").is("Smith")); + var searchHits = operations.searchForHits(searchQuery, SampleEntity.class).block(); + assertThat(searchHits.getTotalHits()).isEqualTo(1); + + // create pit + var pit = operations.openPointInTime(IndexCoordinates.of(indexNameProvider.indexName()), Duration.ofMinutes(10)) + .block(); + assertThat(StringUtils.hasText(pit)).isTrue(); + + // add another smith + operations.save(new SampleEntity("3", "Harry", "Smith")).block(); + + // search with pit -> 1 smith + var pitQuery = new CriteriaQueryBuilder(Criteria.where("lastName").is("Smith")) // + .withPointInTime(new Query.PointInTime(pit, Duration.ofMinutes(10))) // + .build(); + searchHits = operations.searchForHits(pitQuery, SampleEntity.class).block(); + assertThat(searchHits.getTotalHits()).isEqualTo(1); + var newPit = searchHits.getPointInTimeId(); + assertThat(StringUtils.hasText(newPit)).isTrue(); + + // search without pit -> 2 smiths + searchHits = operations.searchForHits(searchQuery, SampleEntity.class).block(); + assertThat(searchHits.getTotalHits()).isEqualTo(2); + + // close pit + var success = operations.closePointInTime(newPit).block(); + assertThat(success).isTrue(); + } + + @Document(indexName = "#{@indexNameProvider.indexName()}") + record SampleEntity( // + @Nullable @Id String id, // + @Field(type = FieldType.Text) String firstName, // + @Field(type = FieldType.Text) String lastName // + ) { + } +}