From 628c925c9a1a6a2406afbdd9f028632394e70a84 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 14 Sep 2020 11:32:01 +0200 Subject: [PATCH] DATAES-922 - Move off deprecated Reactor API. Use .next() instead of publishNext(). Use direct Mono instead of toProcessor(). --- .../reactive/DefaultReactiveElasticsearchClient.java | 12 ++++++------ .../core/ReactiveElasticsearchTemplate.java | 2 +- ...tiveElasticsearchParametersParameterAccessor.java | 7 +++---- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index a5ce6cde3..d17cbf3cc 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -371,7 +371,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch */ @Override public Mono index(HttpHeaders headers, IndexRequest indexRequest) { - return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).publishNext(); + return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).next(); } /* @@ -389,7 +389,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch */ @Override public Mono update(HttpHeaders headers, UpdateRequest updateRequest) { - return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).publishNext(); + return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).next(); } /* @@ -400,7 +400,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch public Mono delete(HttpHeaders headers, DeleteRequest deleteRequest) { return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) // - .publishNext(); + .next(); } /* @@ -556,7 +556,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch public Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) { return sendRequest(deleteRequest, requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) // - .publishNext(); + .next(); } /* @@ -566,7 +566,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch @Override public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) // - .publishNext(); + .next(); } /* @@ -753,7 +753,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch @Override public Mono getAliases(HttpHeaders headers, GetAliasesRequest getAliasesRequest) { - return sendRequest(getAliasesRequest, requestCreator.getAlias(), GetAliasesResponse.class, headers).publishNext(); + return sendRequest(getAliasesRequest, requestCreator.getAlias(), GetAliasesResponse.class, headers).next(); } @Override diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 0064f03ef..39a6ef2bb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -523,7 +523,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera Assert.notNull(query, "Query must not be null!"); - return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).publishNext(); + return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).next(); } @Override diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchParametersParameterAccessor.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchParametersParameterAccessor.java index 38611e5d0..a1072efd5 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchParametersParameterAccessor.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/ReactiveElasticsearchParametersParameterAccessor.java @@ -17,7 +17,6 @@ package org.springframework.data.elasticsearch.repository.query; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import java.util.ArrayList; import java.util.List; @@ -31,7 +30,7 @@ import org.springframework.data.repository.util.ReactiveWrappers; */ class ReactiveElasticsearchParametersParameterAccessor extends ElasticsearchParametersParameterAccessor { - private final List> subscriptions; + private final List> subscriptions; /** * Creates a new {@link ElasticsearchParametersParameterAccessor}. @@ -53,9 +52,9 @@ class ReactiveElasticsearchParametersParameterAccessor extends ElasticsearchPara } if (ReactiveWrappers.isSingleValueType(value.getClass())) { - subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class).toProcessor()); + subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class)); } else { - subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList().toProcessor()); + subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList()); } } }