From 5b1e179e885ee1d9e7dab33e7f3eb963391f9b96 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Tue, 23 Jun 2020 09:01:10 +0200 Subject: [PATCH] DATAES-870 - Polishing. Simplify single-node flow. --- .../reactive/SingleNodeHostProvider.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java index eae140c90..77679e2fc 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/SingleNodeHostProvider.java @@ -15,7 +15,6 @@ */ package org.springframework.data.elasticsearch.client.reactive; -import org.springframework.http.HttpHeaders; import reactor.core.publisher.Mono; import java.net.InetSocketAddress; @@ -25,6 +24,7 @@ import java.util.function.Supplier; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.ElasticsearchHost.State; import org.springframework.data.elasticsearch.client.NoReachableHostException; +import org.springframework.http.HttpHeaders; import org.springframework.web.reactive.function.client.WebClient; /** @@ -60,20 +60,20 @@ class SingleNodeHostProvider implements HostProvider { .head().uri("/") .headers(httpHeaders -> httpHeaders.addAll(headersSupplier.get())) // .exchange() // - .flatMap(it -> { + .map(it -> { if (it.statusCode().isError()) { state = ElasticsearchHost.offline(endpoint); } else { state = ElasticsearchHost.online(endpoint); } - return Mono.just(state); + return state; }).onErrorResume(throwable -> { state = ElasticsearchHost.offline(endpoint); clientProvider.getErrorListener().accept(throwable); return Mono.just(state); }) // - .flatMap(it -> Mono.just(new ClusterInformation(Collections.singleton(it)))); + .map(it -> new ClusterInformation(Collections.singleton(it))); } /* @@ -96,14 +96,16 @@ class SingleNodeHostProvider implements HostProvider { return Mono.just(endpoint); } - return clusterInfo().flatMap(it -> { + return clusterInfo().handle((information, sink) -> { - ElasticsearchHost host = it.getNodes().iterator().next(); + ElasticsearchHost host = information.getNodes().iterator().next(); if (host.isOnline()) { - return Mono.just(host.getEndpoint()); + + sink.next(host.getEndpoint()); + return; } - return Mono.error(() -> new NoReachableHostException(Collections.singleton(host))); + sink.error(new NoReachableHostException(Collections.singleton(host))); }); }