From 513741bcf6499f715035705157112591be787258 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 11 Jan 2020 21:13:42 +0100 Subject: [PATCH] DATAES-701 - Enable proxy support for the reactive rest client. Original PR: #380 --- .../DefaultReactiveElasticsearchClient.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 8c1466cc4..a6835f66b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -25,6 +25,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.ProxyProvider; import reactor.netty.tcp.TcpClient; import java.io.IOException; @@ -198,12 +199,22 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } if (!soTimeout.isNegative()) { - tcpClient = tcpClient.doOnConnected(connection -> connection // .addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)) .addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))); } + if (clientConfiguration.getProxy().isPresent()) { + String proxy = clientConfiguration.getProxy().get(); + String[] hostPort = proxy.split(":"); + + if (hostPort.length != 2) { + throw new IllegalArgumentException("invalid proxy configuration " + proxy + ", should be \"host:port\""); + } + tcpClient = tcpClient.proxy(proxyOptions -> proxyOptions.type(ProxyProvider.Proxy.HTTP).host(hostPort[0]) + .port(Integer.parseInt(hostPort[1]))); + } + String scheme = "http"; HttpClient httpClient = HttpClient.from(tcpClient);