DATAES-922 - Move off deprecated Reactor API.

Use .next() instead of publishNext(). Use direct Mono instead of toProcessor().
This commit is contained in:
Mark Paluch 2020-09-14 11:32:01 +02:00
parent fd707abdf0
commit 628c925c9a
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849
3 changed files with 10 additions and 11 deletions

View File

@ -371,7 +371,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
*/ */
@Override @Override
public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) { public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).publishNext(); return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).next();
} }
/* /*
@ -389,7 +389,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
*/ */
@Override @Override
public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) { public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).publishNext(); return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).next();
} }
/* /*
@ -400,7 +400,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) { public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) // return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) //
.publishNext(); .next();
} }
/* /*
@ -556,7 +556,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) { public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
return sendRequest(deleteRequest, requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) // return sendRequest(deleteRequest, requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
.publishNext(); .next();
} }
/* /*
@ -566,7 +566,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
@Override @Override
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) { public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) // return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) //
.publishNext(); .next();
} }
/* /*
@ -753,7 +753,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
@Override @Override
public Mono<GetAliasesResponse> getAliases(HttpHeaders headers, GetAliasesRequest getAliasesRequest) { public Mono<GetAliasesResponse> getAliases(HttpHeaders headers, GetAliasesRequest getAliasesRequest) {
return sendRequest(getAliasesRequest, requestCreator.getAlias(), GetAliasesResponse.class, headers).publishNext(); return sendRequest(getAliasesRequest, requestCreator.getAlias(), GetAliasesResponse.class, headers).next();
} }
@Override @Override

View File

@ -523,7 +523,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(query, "Query must not be null!"); Assert.notNull(query, "Query must not be null!");
return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).publishNext(); return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).next();
} }
@Override @Override

View File

@ -17,7 +17,6 @@ package org.springframework.data.elasticsearch.repository.query;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -31,7 +30,7 @@ import org.springframework.data.repository.util.ReactiveWrappers;
*/ */
class ReactiveElasticsearchParametersParameterAccessor extends ElasticsearchParametersParameterAccessor { class ReactiveElasticsearchParametersParameterAccessor extends ElasticsearchParametersParameterAccessor {
private final List<MonoProcessor<?>> subscriptions; private final List<Mono<?>> subscriptions;
/** /**
* Creates a new {@link ElasticsearchParametersParameterAccessor}. * Creates a new {@link ElasticsearchParametersParameterAccessor}.
@ -53,9 +52,9 @@ class ReactiveElasticsearchParametersParameterAccessor extends ElasticsearchPara
} }
if (ReactiveWrappers.isSingleValueType(value.getClass())) { if (ReactiveWrappers.isSingleValueType(value.getClass())) {
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class).toProcessor()); subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Mono.class));
} else { } else {
subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList().toProcessor()); subscriptions.add(ReactiveWrapperConverters.toWrapper(value, Flux.class).collectList());
} }
} }
} }