DATAES-488 - Add reactive Elasticsearch client support.

Initial implementation of a ReactiveElasticsearchClient using WebClient to connect to cluster nodes.

ReactiveElasticsearchClient client = ElasticsearchClients.createClient()
  .connectedTo("http://localhost:9200", "http://localhost:9201")
  .reactive();
A HostProvider selects active nodes and routes requests.

client.index(request ->

  request.index("spring-data")
    .type("elasticsearch")
    .id(randomUUID().toString())
    .source(singletonMap("feature", "reactive-client"))
    .setRefreshPolicy(IMMEDIATE);
);
This implementation provides the first building block for reactive Template and Repository support to be added subsequently.

Along the lines we upgraded to Elasticsearch 6.5.

Original Pull Request: #226
This commit is contained in:
Christoph Strobl 2018-10-09 13:45:06 +02:00
parent 25b02f29a7
commit 691a8c57bc
59 changed files with 5566 additions and 35 deletions

4
.gitignore vendored
View File

@ -1,3 +1,7 @@
.DS_Store
*.graphml
.springBeans
atlassian-ide-plugin.xml
## Ignore svn files

20
pom.xml
View File

@ -19,7 +19,7 @@
<properties>
<commonscollections>3.2.1</commonscollections>
<commonslang>2.6</commonslang>
<elasticsearch>6.3.0</elasticsearch>
<elasticsearch>6.5.0</elasticsearch>
<log4j>2.9.1</log4j>
<springdata.commons>2.2.0.BUILD-SNAPSHOT</springdata.commons>
<java-module-name>spring.data.elasticsearch</java-module-name>
@ -51,6 +51,24 @@
<version>${springdata.commons}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.0.RELEASE</version>
</dependency>
<!-- APACHE -->
<dependency>
<groupId>commons-lang</groupId>

View File

@ -0,0 +1,197 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
/**
* Utility class for common access to Elasticsearch clients. {@link ElasticsearchClients} consolidates set up routines
* for the various drivers into a single place.
*
* @author Christoph Strobl
* @since 4.0
*/
public final class ElasticsearchClients {
private ElasticsearchClients() {}
/**
* Start here to create a new client tailored to your needs.
*
* @return new instance of {@link ClientBuilderWithRequiredHost}.
*/
public static ClientBuilderWithRequiredHost createClient() {
return new ElasticsearchClientBuilderImpl();
}
/**
* @author Christoph Strobl
*/
public interface ElasticsearchClientBuilder {
/**
* Apply the configuration to create a {@link ReactiveElasticsearchClient}.
*
* @return new instance of {@link ReactiveElasticsearchClient}.
*/
ReactiveElasticsearchClient reactive();
/**
* Apply the configuration to create a {@link RestHighLevelClient}.
*
* @return new instance of {@link RestHighLevelClient}.
*/
RestHighLevelClient rest();
/**
* Apply the configuration to create a {@link RestClient}.
*
* @return new instance of {@link RestClient}.
*/
default RestClient lowLevelRest() {
return rest().getLowLevelClient();
}
}
/**
* @author Christoph Strobl
*/
public interface ClientBuilderWithRequiredHost {
/**
* @param host the {@literal host} and {@literal port} formatted as String {@literal host:port}. You may leave out
* {@literal http / https} and use {@link MaybeSecureClientBuilder#viaSsl() viaSsl}.
* @return the {@link MaybeSecureClientBuilder}.
*/
default MaybeSecureClientBuilder connectedTo(String host) {
return connectedTo(new String[] { host });
}
/**
* @param hosts the list of {@literal host} and {@literal port} combinations formatted as String
* {@literal host:port}. You may leave out {@literal http / https} and use
* {@link MaybeSecureClientBuilder#viaSsl() viaSsl}.
* @return the {@link MaybeSecureClientBuilder}.
*/
MaybeSecureClientBuilder connectedTo(String... hosts);
/**
* Obviously for testing.
*
* @return the {@link MaybeSecureClientBuilder}.
*/
default MaybeSecureClientBuilder connectedToLocalhost() {
return connectedTo("localhost:9200");
}
}
/**
* @author Christoph Strobl
*/
public interface MaybeSecureClientBuilder extends ClientBuilderWithOptionalDefaultHeaders {
/**
* Connect via {@literal https} <br />
* <strong>NOTE</strong> You need to leave out the protocol in
* {@link ClientBuilderWithRequiredHost#connectedTo(String)}.
*
* @return the {@link ClientBuilderWithOptionalDefaultHeaders}.
*/
ClientBuilderWithOptionalDefaultHeaders viaSsl();
}
/**
* @author Christoph Strobl
*/
public interface ClientBuilderWithOptionalDefaultHeaders extends ElasticsearchClientBuilder {
/**
* @param defaultHeaders
* @return the {@link ElasticsearchClientBuilder}
*/
ElasticsearchClientBuilder withDefaultHeaders(HttpHeaders defaultHeaders);
}
private static class ElasticsearchClientBuilderImpl
implements ElasticsearchClientBuilder, ClientBuilderWithRequiredHost, MaybeSecureClientBuilder {
private List<String> hosts = new ArrayList<>();
private HttpHeaders headers = HttpHeaders.EMPTY;
private String protocoll = "http";
@Override
public ReactiveElasticsearchClient reactive() {
return DefaultReactiveElasticsearchClient.create(headers, formattedHosts().toArray(new String[0]));
}
@Override
public RestHighLevelClient rest() {
HttpHost[] httpHosts = formattedHosts().stream().map(HttpHost::create).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(httpHosts);
if (!headers.isEmpty()) {
Header[] httpHeaders = headers.toSingleValueMap().entrySet().stream()
.map(it -> new BasicHeader(it.getKey(), it.getValue())).toArray(Header[]::new);
builder = builder.setDefaultHeaders(httpHeaders);
}
return new RestHighLevelClient(builder);
}
@Override
public MaybeSecureClientBuilder connectedTo(String... hosts) {
Assert.notEmpty(hosts, "At least one host is required.");
this.hosts.addAll(Arrays.asList(hosts));
return this;
}
@Override
public ClientBuilderWithOptionalDefaultHeaders withDefaultHeaders(HttpHeaders defaultHeaders) {
Assert.notNull(defaultHeaders, "DefaultHeaders must not be null!");
this.headers = defaultHeaders;
return this;
}
List<String> formattedHosts() {
return hosts.stream().map(it -> it.startsWith("http") ? it : protocoll + "://" + it).collect(Collectors.toList());
}
@Override
public ClientBuilderWithOptionalDefaultHeaders viaSsl() {
this.protocoll = "https";
return this;
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client;
import java.time.Instant;
/**
* Value Object containing information about Elasticsearch cluster nodes.
*
* @author Christoph Strobl
* @since 4.0
*/
public class ElasticsearchHost {
private final String host;
private final State state;
private final Instant timestamp;
public ElasticsearchHost(String host, State state) {
this.host = host;
this.state = state;
this.timestamp = Instant.now();
}
/**
* @param host must not be {@literal null}.
* @return new instance of {@link ElasticsearchHost}.
*/
public static ElasticsearchHost online(String host) {
return new ElasticsearchHost(host, State.ONLINE);
}
/**
* @param host must not be {@literal null}.
* @return new instance of {@link ElasticsearchHost}.
*/
public static ElasticsearchHost offline(String host) {
return new ElasticsearchHost(host, State.OFFLINE);
}
/**
* @return {@literal true} if the last known {@link State} was {@link State#ONLINE}
*/
public boolean isOnline() {
return State.ONLINE.equals(state);
}
/**
* @return never {@literal null}.
*/
public String getHost() {
return host;
}
/**
* @return the last known {@link State}.
*/
public State getState() {
return state;
}
/**
* @return the {@link Instant} the information was captured.
*/
public Instant getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return "ElasticsearchHost(" + host + ", " + state.name() + ")";
}
public enum State {
ONLINE, OFFLINE, UNKNOWN
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client;
import java.util.Set;
/**
* {@link RuntimeException} to be emitted / thrown when the cluster is down (aka none of the known nodes is reachable).
*
* @author Christoph Strobl
* @since 4.0
*/
public class NoReachableHostException extends RuntimeException {
public NoReachableHostException(Set<ElasticsearchHost> hosts) {
super(createMessage(hosts));
}
public NoReachableHostException(Set<ElasticsearchHost> hosts, Throwable cause) {
super(createMessage(hosts), cause);
}
private static String createMessage(Set<ElasticsearchHost> hosts) {
if (hosts.size() == 1) {
return String.format("Host '%s' not reachable. Cluster state is offline.", hosts.iterator().next().getHost());
}
return String.format("No active host found in cluster. (%s) of (%s) nodes offline.", hosts.size(), hosts.size());
}
}

View File

@ -15,11 +15,15 @@
*/
package org.springframework.data.elasticsearch.client;
import static java.util.Arrays.*;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.node.Node;
@ -32,8 +36,6 @@ import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.StringUtils;
import static java.util.Arrays.*;
/**
* NodeClientFactoryBean
*
@ -41,7 +43,6 @@ import static java.util.Arrays.*;
* @author Mohsin Husen
* @author Ilkang Na
*/
public class NodeClientFactoryBean implements FactoryBean<Client>, InitializingBean, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(NodeClientFactoryBean.class);
@ -54,13 +55,22 @@ public class NodeClientFactoryBean implements FactoryBean<Client>, InitializingB
private String pathConfiguration;
public static class TestNode extends Node {
public TestNode(Settings preparedSettings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins);
super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins, false);
}
protected void registerDerivedNodeNameWithLogger(String nodeName) {
try {
LogConfigurator.setNodeName(nodeName);
} catch (Exception e) {
// nagh - just forget about it
}
}
}
NodeClientFactoryBean() {
}
NodeClientFactoryBean() {}
public NodeClientFactoryBean(boolean local) {
this.local = local;
@ -84,22 +94,18 @@ public class NodeClientFactoryBean implements FactoryBean<Client>, InitializingB
@Override
public void afterPropertiesSet() throws Exception {
nodeClient = (NodeClient) new TestNode(
Settings.builder().put(loadConfig())
.put("transport.type", "netty4")
.put("http.type", "netty4")
.put("path.home", this.pathHome)
.put("path.data", this.pathData)
.put("cluster.name", this.clusterName)
.put("node.max_local_storage_nodes", 100)
.build(), asList(Netty4Plugin.class)).start().client();
nodeClient = (NodeClient) new TestNode(Settings.builder().put(loadConfig()).put("transport.type", "netty4")
.put("http.type", "netty4").put("path.home", this.pathHome).put("path.data", this.pathData)
.put("cluster.name", this.clusterName).put("node.max_local_storage_nodes", 100).build(),
asList(Netty4Plugin.class)).start().client();
}
private Settings loadConfig() throws IOException {
if (!StringUtils.isEmpty(pathConfiguration)) {
InputStream stream = getClass().getClassLoader().getResourceAsStream(pathConfiguration);
if (stream != null) {
return Settings.builder().loadFromStream(pathConfiguration, getClass().getClassLoader().getResourceAsStream(pathConfiguration), false).build();
return Settings.builder().loadFromStream(pathConfiguration,
getClass().getClassLoader().getResourceAsStream(pathConfiguration), false).build();
}
logger.error(String.format("Unable to read node configuration from file [%s]", pathConfiguration));
}

View File

@ -0,0 +1,518 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainRequest;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.reactivestreams.Publisher;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode;
import org.springframework.data.elasticsearch.client.util.RequestConverters;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StreamUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.reactive.function.BodyExtractor;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec;
import org.springframework.web.reactive.function.client.WebClientException;
/**
* A {@link WebClient} based {@link ReactiveElasticsearchClient} that connects to an Elasticsearch cluster through HTTP.
*
* @author Christoph Strobl
* @since 4.0
*/
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient {
private final HostProvider hostProvider;
/**
* Create a new {@link DefaultReactiveElasticsearchClient} using the given hostProvider to obtain server connections.
*
* @param hostProvider must not be {@literal null}.
*/
public DefaultReactiveElasticsearchClient(HostProvider hostProvider) {
this.hostProvider = hostProvider;
}
/**
* Create a new {@link DefaultReactiveElasticsearchClient} aware of the given nodes in the cluster. <br />
* <strong>NOTE</strong> If the cluster requires authentication be sure to provide the according {@link HttpHeaders}
* correctly.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param hosts must not be {@literal null} nor empty!
* @return new instance of {@link DefaultReactiveElasticsearchClient}.
*/
public static ReactiveElasticsearchClient create(HttpHeaders headers, String... hosts) {
Assert.notEmpty(hosts, "Elasticsearch Cluster needs to consist of at least one host");
HostProvider hostProvider = HostProvider.provider(hosts);
return new DefaultReactiveElasticsearchClient(
headers.isEmpty() ? hostProvider : hostProvider.withDefaultHeaders(headers));
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders)
*/
@Override
public Mono<Boolean> ping(HttpHeaders headers) {
return sendRequest(new MainRequest(), RequestCreator.ping(), RawActionResponse.class, headers) //
.map(response -> response.statusCode().is2xxSuccessful()) //
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#info(org.springframework.http.HttpHeaders)
*/
@Override
public Mono<MainResponse> info(HttpHeaders headers) {
return sendRequest(new MainRequest(), RequestCreator.info(), MainResponse.class, headers) //
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaderss, org.elasticsearch.action.get.GetRequest)
*/
@Override
public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
return sendRequest(getRequest, RequestCreator.get(), GetResponse.class, headers) //
.filter(GetResponse::isExists) //
.map(DefaultReactiveElasticsearchClient::getResponseToGetResult) //
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#multiGet(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.MultiGetRequest)
*/
@Override
public Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
return sendRequest(multiGetRequest, RequestCreator.multiGet(), MultiGetResponse.class, headers)
.map(MultiGetResponse::getResponses) //
.flatMap(Flux::fromArray) //
.filter(it -> !it.isFailed() && it.getResponse().isExists()) //
.map(it -> DefaultReactiveElasticsearchClient.getResponseToGetResult(it.getResponse()));
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#exists(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
*/
@Override
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
return sendRequest(getRequest, RequestCreator.exists(), RawActionResponse.class, headers) //
.map(response -> {
if (response.statusCode().is2xxSuccessful()) {
return true;
}
if (response.statusCode().is5xxServerError()) {
throw new HttpClientErrorException(response.statusCode(), String.format(
"Exists request (%s) returned error code %s.", getRequest.toString(), response.statusCode().value()));
}
return false;
}) //
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.index.IndexRequest)
*/
@Override
public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
return sendRequest(indexRequest, RequestCreator.index(), IndexResponse.class, headers).publishNext();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest)
*/
@Override
public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
return sendRequest(updateRequest, RequestCreator.update(), UpdateResponse.class, headers).publishNext();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.delete.DeleteRequest)
*/
@Override
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
return sendRequest(deleteRequest, RequestCreator.delete(), DeleteResponse.class, headers).publishNext();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
return sendRequest(searchRequest, RequestCreator.search(), SearchResponse.class, headers) //
.map(SearchResponse::getHits) //
.flatMap(Flux::fromIterable);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)
*/
@Override
public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) {
return this.hostProvider.getActive(VerificationMode.LAZY) //
.flatMap(it -> callback.doWithClient(it)) //
.onErrorResume(throwable -> {
if (throwable instanceof ConnectException) {
return hostProvider.getActive(VerificationMode.FORCE) //
.flatMap(webClient -> callback.doWithClient(webClient));
}
return Mono.error(throwable);
});
}
@Override
public Mono<Status> status() {
return hostProvider.clusterInfo() //
.map(it -> new ClientStatus(it.getNodes()));
}
// --> Private Response helpers
private static GetResult getResponseToGetResult(GetResponse response) {
return new GetResult(response.getIndex(), response.getType(), response.getId(), response.getVersion(),
response.isExists(), response.getSourceAsBytesRef(), response.getFields());
}
// -->
private <Req extends ActionRequest, Resp extends ActionResponse> Flux<Resp> sendRequest(Req request,
Function<Req, Request> converter, Class<Resp> responseType, HttpHeaders headers) {
return sendRequest(converter.apply(request), responseType, headers);
}
private <AR extends ActionResponse> Flux<AR> sendRequest(Request request, Class<AR> responseType,
HttpHeaders headers) {
return execute(webClient -> sendRequest(webClient, request, headers))
.flatMapMany(response -> readResponseBody(request, response, responseType));
}
private Mono<ClientResponse> sendRequest(WebClient webClient, Request request, HttpHeaders headers) {
RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
.uri(request.getEndpoint(), request.getParameters()) //
.headers(theHeaders -> theHeaders.addAll(headers));
if (request.getEntity() != null) {
requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
requestBodySpec.body(bodyExtractor(request), String.class);
}
return requestBodySpec //
.exchange() //
.onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());
}
private Publisher<String> bodyExtractor(Request request) {
return Mono.fromSupplier(() -> {
try {
return EntityUtils.toString(request.getEntity());
} catch (IOException e) {
throw new RequestBodyEncodingException("Error encoding request", e);
}
});
}
private <T> Publisher<? extends T> readResponseBody(Request request, ClientResponse response, Class<T> responseType) {
if (RawActionResponse.class.equals(responseType)) {
return Mono.just((T) new RawActionResponse(response));
}
if (response.statusCode().is5xxServerError()) {
throw new HttpClientErrorException(response.statusCode(),
String.format("%s request to %s returned error code %s.", request.getMethod(), request.getEndpoint(),
response.statusCode().value()));
}
return response.body(BodyExtractors.toDataBuffers()).flatMap(it -> {
try {
String content = StreamUtils.copyToString(it.asInputStream(true), StandardCharsets.UTF_8);
try {
XContentParser contentParser = XContentType
.fromMediaTypeOrFormat(
response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()))
.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class);
return Mono.just((T) ReflectionUtils.invokeMethod(fromXContent, responseType, contentParser));
} catch (Exception parseFailure) {
try {
XContentParser errorParser = XContentType
.fromMediaTypeOrFormat(
response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()))
.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
// return Mono.error to avoid ElasticsearchStatusException to be caught by outer catch.
return Mono.error(BytesRestResponse.errorFromXContent(errorParser));
} catch (Exception errorParseFailure) {
// return Mono.error to avoid ElasticsearchStatusException to be caught by outer catch.
return Mono.error(new ElasticsearchStatusException("Unable to parse response body",
RestStatus.fromCode(response.statusCode().value())));
}
}
} catch (IOException e) {
throw new DataAccessResourceFailureException("Error parsing XContent.", e);
}
});
}
static class RequestCreator {
static Function<SearchRequest, Request> search() {
return RequestConverters::search;
}
static Function<IndexRequest, Request> index() {
return RequestConverters::index;
}
static Function<GetRequest, Request> get() {
return RequestConverters::get;
}
static Function<MainRequest, Request> ping() {
return (request) -> RequestConverters.ping();
}
static Function<MainRequest, Request> info() {
return (request) -> RequestConverters.info();
}
static Function<MultiGetRequest, Request> multiGet() {
return RequestConverters::multiGet;
}
static Function<GetRequest, Request> exists() {
return RequestConverters::exists;
}
static Function<UpdateRequest, Request> update() {
return RequestConverters::update;
}
static Function<DeleteRequest, Request> delete() {
return RequestConverters::delete;
}
}
public static class RequestBodyEncodingException extends WebClientException {
RequestBodyEncodingException(String msg, Throwable ex) {
super(msg, ex);
}
}
static class RawActionResponse extends ActionResponse implements ClientResponse {
final ClientResponse delegate;
RawActionResponse(ClientResponse delegate) {
this.delegate = delegate;
}
public HttpStatus statusCode() {
return delegate.statusCode();
}
public int rawStatusCode() {
return delegate.rawStatusCode();
}
public Headers headers() {
return delegate.headers();
}
public MultiValueMap<String, ResponseCookie> cookies() {
return delegate.cookies();
}
public ExchangeStrategies strategies() {
return delegate.strategies();
}
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
return delegate.body(extractor);
}
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
return delegate.bodyToMono(elementClass);
}
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
return delegate.bodyToMono(typeReference);
}
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
return delegate.bodyToFlux(elementClass);
}
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
return delegate.bodyToFlux(typeReference);
}
public <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType) {
return delegate.toEntity(bodyType);
}
public <T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> typeReference) {
return delegate.toEntity(typeReference);
}
public <T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> elementType) {
return delegate.toEntityList(elementType);
}
public <T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> typeReference) {
return delegate.toEntityList(typeReference);
}
public static Builder from(ClientResponse other) {
return ClientResponse.from(other);
}
public static Builder create(HttpStatus statusCode) {
return ClientResponse.create(statusCode);
}
public static Builder create(HttpStatus statusCode, ExchangeStrategies strategies) {
return ClientResponse.create(statusCode, strategies);
}
public static Builder create(HttpStatus statusCode, List<HttpMessageReader<?>> messageReaders) {
return ClientResponse.create(statusCode, messageReaders);
}
}
/**
* Reactive client {@link ReactiveElasticsearchClient.Status} implementation.
*
* @author Christoph Strobl
*/
class ClientStatus implements Status {
private final Collection<ElasticsearchHost> connectedHosts;
ClientStatus(Collection<ElasticsearchHost> connectedHosts) {
this.connectedHosts = connectedHosts;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Status#hosts()
*/
@Override
public Collection<ElasticsearchHost> hosts() {
return connectedHosts;
}
}
}

View File

@ -0,0 +1,184 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.web.reactive.function.client.WebClient;
/**
* Infrastructure helper class aware of hosts within the cluster and the health of those allowing easy selection of
* active ones.
*
* @author Christoph Strobl
* @since 4.0
*/
public interface HostProvider {
/**
* Lookup an active host in {@link VerificationMode#LAZY lazy} mode utilizing cached {@link ElasticsearchHost}.
*
* @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error} if none found.
*/
default Mono<String> lookupActiveHost() {
return lookupActiveHost(VerificationMode.LAZY);
}
/**
* Lookup an active host in using the given {@link VerificationMode}.
*
* @param verificationMode
* @return the {@link Mono} emitting the active host or {@link Mono#error(Throwable) an error}
* ({@link NoReachableHostException}) if none found.
*/
Mono<String> lookupActiveHost(VerificationMode verificationMode);
/**
* Get the {@link WebClient} connecting to an active host utilizing cached {@link ElasticsearchHost}.
*
* @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none
* found.
*/
default Mono<WebClient> getActive() {
return getActive(VerificationMode.LAZY);
}
/**
* Get the {@link WebClient} connecting to an active host.
*
* @param verificationMode must not be {@literal null}.
* @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none
* found.
*/
default Mono<WebClient> getActive(VerificationMode verificationMode) {
return getActive(verificationMode, getDefaultHeaders());
}
/**
* Get the {@link WebClient} with default {@link HttpHeaders} connecting to an active host.
*
* @param verificationMode must not be {@literal null}.
* @param headers must not be {@literal null}.
* @return the {@link Mono} emitting the client for an active host or {@link Mono#error(Throwable) an error} if none
* found.
*/
default Mono<WebClient> getActive(VerificationMode verificationMode, HttpHeaders headers) {
return lookupActiveHost(verificationMode).map(host -> createWebClient(host, headers));
}
/**
* Get the {@link WebClient} with default {@link HttpHeaders} connecting to the given host.
*
* @param host must not be {@literal null}.
* @param headers must not be {@literal null}.
* @return
*/
default WebClient createWebClient(String host, HttpHeaders headers) {
return WebClient.builder().baseUrl(host).defaultHeaders(defaultHeaders -> defaultHeaders.putAll(headers)).build();
}
/**
* Obtain information about known cluster nodes.
*
* @return the {@link Mono} emitting {@link ClusterInformation} when available.
*/
Mono<ClusterInformation> clusterInfo();
/**
* Obtain the {@link HttpHeaders} to be used by default.
*
* @return never {@literal null}. {@link HttpHeaders#EMPTY} by default.
*/
HttpHeaders getDefaultHeaders();
/**
* Create a new instance of {@link HostProvider} applying the given headers by default.
*
* @param headers must not be {@literal null}.
* @return new instance of {@link HostProvider}.
*/
HostProvider withDefaultHeaders(HttpHeaders headers);
/**
* Create a new instance of {@link HostProvider} calling the given {@link Consumer} on error.
*
* @param errorListener must not be {@literal null}.
* @return new instance of {@link HostProvider}.
*/
HostProvider withErrorListener(Consumer<Throwable> errorListener);
/**
* Create a new {@link HostProvider} best suited for the given number of hosts.
*
* @param hosts must not be {@literal null} nor empty.
* @return new instance of {@link HostProvider}.
*/
static HostProvider provider(String... hosts) {
Assert.notEmpty(hosts, "Please provide at least one host to connect to.");
if (hosts.length == 1) {
return new SingleNodeHostProvider(HttpHeaders.EMPTY, (err) -> {}, hosts[0]);
} else {
return new MultiNodeHostProvider(HttpHeaders.EMPTY, (err) -> {}, hosts);
}
}
/**
* @author Christoph Strobl
* @since 4.0
*/
enum VerificationMode {
/**
* Actively check for cluster node health.
*/
FORCE,
/**
* Use cached data for cluster node health.
*/
LAZY
}
/**
* Value object accumulating information about cluster an Elasticsearch cluster.
*
* @author Christoph Strobll
* @since 4.0.
*/
class ClusterInformation {
private final Set<ElasticsearchHost> nodes;
public ClusterInformation(Set<ElasticsearchHost> nodes) {
this.nodes = nodes;
}
public Set<ElasticsearchHost> getNodes() {
return Collections.unmodifiableSet(nodes);
}
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.ClientResponse;
/**
* @author Christoph Strobl
* @since 4.0
*/
class MultiNodeHostProvider implements HostProvider {
private final HttpHeaders headers;
private final Consumer<Throwable> errorListener;
private final Map<String, ElasticsearchHost> hosts;
MultiNodeHostProvider(HttpHeaders headers, Consumer<Throwable> errorListener, String... hosts) {
this.headers = headers;
this.errorListener = errorListener;
this.hosts = new ConcurrentHashMap<>();
for (String host : hosts) {
this.hosts.put(host, new ElasticsearchHost(host, State.UNKNOWN));
}
}
public Mono<ClusterInformation> clusterInfo() {
return nodes(null).map(this::updateNodeState).buffer(hosts.size())
.then(Mono.just(new ClusterInformation(new LinkedHashSet<>(this.hosts.values()))));
}
Collection<ElasticsearchHost> getCachedHostState() {
return hosts.values();
}
@Override
public HttpHeaders getDefaultHeaders() {
return this.headers;
}
@Override
public Mono<String> lookupActiveHost(VerificationMode verificationMode) {
if (VerificationMode.LAZY.equals(verificationMode)) {
for (ElasticsearchHost entry : hosts()) {
if (entry.isOnline()) {
return Mono.just(entry.getHost());
}
}
}
return findActiveHostInKnownActives() //
.switchIfEmpty(findActiveHostInUnresolved()) //
.switchIfEmpty(findActiveHostInDead()) //
.switchIfEmpty(Mono.error(() -> new NoReachableHostException(new LinkedHashSet<>(getCachedHostState()))));
}
@Override
public HostProvider withDefaultHeaders(HttpHeaders headers) {
return new MultiNodeHostProvider(headers, errorListener, hosts.keySet().toArray(new String[0]));
}
@Override
public HostProvider withErrorListener(Consumer<Throwable> errorListener) {
return new MultiNodeHostProvider(headers, errorListener, hosts.keySet().toArray(new String[0]));
}
private Mono<String> findActiveHostInKnownActives() {
return findActiveForSate(State.ONLINE);
}
private Mono<String> findActiveHostInUnresolved() {
return findActiveForSate(State.UNKNOWN);
}
private Mono<String> findActiveHostInDead() {
return findActiveForSate(State.OFFLINE);
}
private Mono<String> findActiveForSate(State state) {
return nodes(state).map(this::updateNodeState).filter(ElasticsearchHost::isOnline).map(it -> it.getHost()).next();
}
private ElasticsearchHost updateNodeState(Tuple2<String, ClientResponse> tuple2) {
State state = tuple2.getT2().statusCode().isError() ? State.OFFLINE : State.ONLINE;
ElasticsearchHost elasticsearchHost = new ElasticsearchHost(tuple2.getT1(), state);
hosts.put(tuple2.getT1(), elasticsearchHost);
return elasticsearchHost;
}
private Flux<Tuple2<String, ClientResponse>> nodes(@Nullable State state) {
return Flux.fromIterable(hosts()) //
.filter(entry -> state != null ? entry.getState().equals(state) : true) //
.map(ElasticsearchHost::getHost) //
.flatMap(host -> {
Mono<ClientResponse> exchange = createWebClient(host, headers) //
.head().uri("/").exchange().doOnError(throwable -> {
hosts.put(host, new ElasticsearchHost(host, State.OFFLINE));
errorListener.accept(throwable);
});
return Mono.just(host).zipWith(exchange);
}) //
.onErrorContinue((throwable, o) -> {
errorListener.accept(throwable);
});
}
private List<ElasticsearchHost> hosts() {
List<ElasticsearchHost> hosts = new ArrayList<>(this.hosts.values());
Collections.shuffle(hosts);
return hosts;
}
}

View File

@ -0,0 +1,407 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.ConnectException;
import java.util.Collection;
import java.util.function.Consumer;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.search.SearchHit;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.http.HttpHeaders;
import org.springframework.util.CollectionUtils;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
/**
* A reactive client to connect to Elasticsearch. <br />
*
* @author Christoph Strobl
* @since 4.0
*/
public interface ReactiveElasticsearchClient {
/**
* Pings the remote Elasticsearch cluster and emits {@literal true} if the ping succeeded, {@literal false} otherwise.
*
* @return the {@link Mono} emitting the result of the ping attempt.
*/
default Mono<Boolean> ping() {
return ping(HttpHeaders.EMPTY);
}
/**
* Pings the remote Elasticsearch cluster and emits {@literal true} if the ping succeeded, {@literal false} otherwise.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @return the {@link Mono} emitting the result of the ping attempt.
*/
Mono<Boolean> ping(HttpHeaders headers);
/**
* Get the cluster info otherwise provided when sending an HTTP request to port 9200.
*
* @return the {@link Mono} emitting the result of the info request.
*/
default Mono<MainResponse> info() {
return info(HttpHeaders.EMPTY);
}
/**
* Get the cluster info otherwise provided when sending an HTTP request to port 9200.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @return the {@link Mono} emitting the result of the info request.
*/
Mono<MainResponse> info(HttpHeaders headers);
/**
* Execute the given {@link GetRequest} against the {@literal get} API to retrieve a document by id.
*
* @param getRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html">Get API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link GetResult result}.
*/
default Mono<GetResult> get(GetRequest getRequest) {
return get(HttpHeaders.EMPTY, getRequest);
}
/**
* Execute a {@link GetRequest} against the {@literal get} API to retrieve a document by id.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html">Get API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link GetResult result}.
*/
default Mono<GetResult> get(Consumer<GetRequest> consumer) {
GetRequest request = new GetRequest();
consumer.accept(request);
return get(request);
}
/**
* Execute the given {@link GetRequest} against the {@literal get} API to retrieve a document by id.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param getRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html">Get API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link GetResult result}.
*/
Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest);
/**
* Execute the given {@link MultiGetRequest} against the {@literal multi-get} API to retrieve multiple documents by
* id.
*
* @param multiGetRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html">Multi Get API on
* elastic.co</a>
* @return the {@link Flux} emitting the {@link GetResult result}.
*/
default Flux<GetResult> multiGet(MultiGetRequest multiGetRequest) {
return multiGet(HttpHeaders.EMPTY, multiGetRequest);
}
/**
* Execute a {@link MultiGetRequest} against the {@literal multi-get} API to retrieve multiple documents by id.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html">Multi Get API on
* elastic.co</a>
* @return the {@link Flux} emitting the {@link GetResult result}.
*/
default Flux<GetResult> multiGet(Consumer<MultiGetRequest> consumer) {
MultiGetRequest request = new MultiGetRequest();
consumer.accept(request);
return multiGet(request);
}
/**
* Execute the given {@link MultiGetRequest} against the {@literal multi-get} API to retrieve multiple documents by
* id.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param multiGetRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html">Multi Get API on
* elastic.co</a>
* @return the {@link Flux} emitting the {@link GetResult result}.
*/
Flux<GetResult> multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest);
/**
* Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise.
*
* @param getRequest must not be {@literal null}.
* @return the {@link Mono} emitting {@literal true} if it exists, {@literal false} otherwise.
*/
default Mono<Boolean> exists(GetRequest getRequest) {
return exists(HttpHeaders.EMPTY, getRequest);
}
/**
* Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise.
*
* @param consumer never {@literal null}.
* @return the {@link Mono} emitting {@literal true} if it exists, {@literal false} otherwise.
*/
default Mono<Boolean> exists(Consumer<GetRequest> consumer) {
GetRequest request = new GetRequest();
consumer.accept(request);
return exists(request);
}
/**
* Checks for the existence of a document. Emits {@literal true} if it exists, {@literal false} otherwise.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param getRequest must not be {@literal null}.
* @return the {@link Mono} emitting {@literal true} if it exists, {@literal false} otherwise.
*/
Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest);
/**
* Execute the given {@link IndexRequest} against the {@literal index} API to index a document.
*
* @param indexRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index.html">Index API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link IndexResponse}.
*/
default Mono<IndexResponse> index(IndexRequest indexRequest) {
return index(HttpHeaders.EMPTY, indexRequest);
}
/**
* Execute an {@link IndexRequest} against the {@literal index} API to index a document.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index.html">Index API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link IndexResponse}.
*/
default Mono<IndexResponse> index(Consumer<IndexRequest> consumer) {
IndexRequest request = new IndexRequest();
consumer.accept(request);
return index(request);
}
/**
* Execute the given {@link IndexRequest} against the {@literal index} API to index a document.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param indexRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index.html">Index API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link IndexResponse}.
*/
Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest);
/**
* Execute the given {@link UpdateRequest} against the {@literal update} API to alter a document.
*
* @param updateRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html">Update API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link UpdateResponse}.
*/
default Mono<UpdateResponse> update(UpdateRequest updateRequest) {
return update(HttpHeaders.EMPTY, updateRequest);
}
/**
* Execute an {@link UpdateRequest} against the {@literal update} API to alter a document.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html">Update API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link UpdateResponse}.
*/
default Mono<UpdateResponse> update(Consumer<UpdateRequest> consumer) {
UpdateRequest request = new UpdateRequest();
consumer.accept(request);
return update(request);
}
/**
* Execute the given {@link UpdateRequest} against the {@literal update} API to alter a document.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param updateRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html">Update API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link UpdateResponse}.
*/
Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest);
/**
* Execute the given {@link DeleteRequest} against the {@literal delete} API to remove a document.
*
* @param deleteRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html">Delete API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link DeleteResponse}.
*/
default Mono<DeleteResponse> delete(DeleteRequest deleteRequest) {
return delete(HttpHeaders.EMPTY, deleteRequest);
}
/**
* Execute a {@link DeleteRequest} against the {@literal delete} API to remove a document.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html">Delete API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link DeleteResponse}.
*/
default Mono<DeleteResponse> delete(Consumer<DeleteRequest> consumer) {
DeleteRequest request = new DeleteRequest();
consumer.accept(request);
return delete(request);
}
/**
* Execute the given {@link DeleteRequest} against the {@literal delete} API to remove a document.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param deleteRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html">Delete API on
* elastic.co</a>
* @return the {@link Mono} emitting the {@link DeleteResponse}.
*/
Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest);
/**
* Execute the given {@link SearchRequest} against the {@literal search} API.
*
* @param searchRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Flux} emitting {@link SearchHit hits} one by one.
*/
default Flux<SearchHit> search(SearchRequest searchRequest) {
return search(HttpHeaders.EMPTY, searchRequest);
}
/**
* Execute a {@link SearchRequest} against the {@literal search} API.
*
* @param consumer never {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Flux} emitting {@link SearchHit hits} one by one.
*/
default Flux<SearchHit> search(Consumer<SearchRequest> consumer) {
SearchRequest request = new SearchRequest();
consumer.accept(request);
return search(request);
}
/**
* Execute the given {@link SearchRequest} against the {@literal search} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param searchRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html">Search API on
* elastic.co</a>
* @return the {@link Flux} emitting {@link SearchHit hits} one by one.
*/
Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest);
/**
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and
* retries operations that fail with a {@link ConnectException} on another node if the previously selected one becomes
* unavailable.
*
* @param callback the {@link ReactiveElasticsearchClientCallback callback} wielding the actual command to run.
* @return the {@link Mono} emitting the {@link ClientResponse} once subscribed.
*/
Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback);
/**
* Get the current client {@link Status}. <br />
* <strong>NOTE</strong> the actual implementation might choose to actively check the current cluster state by pinging
* known nodes.
*
* @return
*/
Mono<Status> status();
/**
* Low level callback interface operating upon {@link WebClient} to send commands towards elasticsearch.
*
* @author Christoph Strobl
* @since 4.0
*/
interface ReactiveElasticsearchClientCallback {
Mono<ClientResponse> doWithClient(WebClient client);
}
/**
* Cumulative client {@link ElasticsearchHost} information.
*
* @author Christoph Strobl
* @since 4.0
*/
interface Status {
/**
* Get the list of known hosts and their getCachedHostState.
*
* @return never {@literal null}.
*/
Collection<ElasticsearchHost> hosts();
/**
* @return {@literal true} if at least one host is available.
*/
default boolean isOk() {
Collection<ElasticsearchHost> hosts = hosts();
if (CollectionUtils.isEmpty(hosts)) {
return false;
}
return !hosts().stream().filter(it -> !it.isOnline()).findFirst().isPresent();
}
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.function.Consumer;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.http.HttpHeaders;
/**
* @author Christoph Strobl
* @since 4.0
*/
class SingleNodeHostProvider implements HostProvider {
private final HttpHeaders headers;
private final Consumer<Throwable> errorListener;
private final String hostname;
private volatile ElasticsearchHost state;
SingleNodeHostProvider(HttpHeaders headers, Consumer<Throwable> errorListener, String host) {
this.headers = headers;
this.errorListener = errorListener;
this.hostname = host;
this.state = new ElasticsearchHost(hostname, State.UNKNOWN);
}
@Override
public Mono<ClusterInformation> clusterInfo() {
return createWebClient(hostname, headers) //
.head().uri("/").exchange() //
.flatMap(it -> {
if(it.statusCode().isError()) {
state = ElasticsearchHost.offline(hostname);
} else {
state = ElasticsearchHost.online(hostname);
}
return Mono.just(state);
}).onErrorResume(throwable -> {
state = ElasticsearchHost.offline(hostname);
errorListener.accept(throwable);
return Mono.just(state);
}) //
.flatMap(it -> Mono.just(new ClusterInformation(Collections.singleton(it))));
}
@Override
public Mono<String> lookupActiveHost(VerificationMode verificationMode) {
if (VerificationMode.LAZY.equals(verificationMode) && state.isOnline()) {
return Mono.just(hostname);
}
return clusterInfo().flatMap(it -> {
ElasticsearchHost host = it.getNodes().iterator().next();
if (host.isOnline()) {
return Mono.just(host.getHost());
}
return Mono.error(() -> new NoReachableHostException(Collections.singleton(host)));
});
}
@Override
public HttpHeaders getDefaultHeaders() {
return this.headers;
}
@Override
public HostProvider withDefaultHeaders(HttpHeaders headers) {
return new SingleNodeHostProvider(headers, errorListener, hostname);
}
@Override
public HostProvider withErrorListener(Consumer<Throwable> errorListener) {
return new SingleNodeHostProvider(headers, errorListener, hostname);
}
ElasticsearchHost getCachedHostState() {
return state;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.ElasticsearchException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
/**
* @author Christoph Strobl
* @since 4.0
*/
public class ElasticsearchExceptionTranslator implements PersistenceExceptionTranslator {
@Override
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
if (ex instanceof ElasticsearchException) {
// TODO: exception translation
}
return null;
}
}

View File

@ -0,0 +1,349 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import static org.elasticsearch.index.VersionType.*;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.reactivestreams.Publisher;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpClientErrorException;
/**
* @author Christoph Strobl
* @since 4.0
*/
public class ReactiveElasticsearchTemplate {
private final ReactiveElasticsearchClient client;
private final ElasticsearchConverter converter;
private final DefaultResultMapper mapper;
private final ElasticsearchExceptionTranslator exceptionTranslator;
public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client) {
this(client, new MappingElasticsearchConverter(new SimpleElasticsearchMappingContext()));
}
public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, ElasticsearchConverter converter) {
this.client = client;
this.converter = converter;
this.mapper = new DefaultResultMapper(converter.getMappingContext());
this.exceptionTranslator = new ElasticsearchExceptionTranslator();
}
public <T> Mono<T> index(T entity) {
return index(entity, null);
}
public <T> Mono<T> index(T entity, String index) {
return index(entity, index, null);
}
/**
* Add the given entity to the index.
*
* @param entity
* @param index
* @param type
* @param <T>
* @return
*/
public <T> Mono<T> index(T entity, String index, String type) {
ElasticsearchPersistentEntity<?> persistentEntity = lookupPersistentEntity(entity.getClass());
return doIndex(entity, persistentEntity, index, type) //
.map(it -> {
// TODO: update id if necessary!
// it.getId()
// it.getVersion()
return entity;
});
}
public <T> Mono<T> get(String id, Class<T> resultType) {
return get(id, resultType, null);
}
public <T> Mono<T> get(String id, Class<T> resultType, @Nullable String index) {
return get(id, resultType, index, null);
}
/**
* Fetch the entity with given id.
*
* @param id must not be {@literal null}.
* @param resultType must not be {@literal null}.
* @param index
* @param type
* @param <T>
* @return the {@link Mono} emitting the entity or signalling completion if none found.
*/
public <T> Mono<T> get(String id, Class<T> resultType, @Nullable String index, @Nullable String type) {
ElasticsearchPersistentEntity<?> persistentEntity = lookupPersistentEntity(resultType);
GetRequest request = new GetRequest(persistentEntity.getIndexName(), persistentEntity.getIndexType(), id);
return goGet(id, persistentEntity, index, type).map(it -> mapper.mapEntity(it.sourceAsString(), resultType));
}
/**
* Search the index for entities matching the given {@link CriteriaQuery query}.
*
* @param query must not be {@literal null}.
* @param resultType must not be {@literal null}.
* @param <T>
* @return
*/
public <T> Flux<T> query(CriteriaQuery query, Class<T> resultType) {
ElasticsearchPersistentEntity<?> entity = lookupPersistentEntity(resultType);
SearchRequest request = new SearchRequest(indices(query, entity));
request.types(indexTypes(query, entity));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(mappedQuery(query, entity));
// TODO: request.source().postFilter(elasticsearchFilter); -- filter query
searchSourceBuilder.version(entity.hasVersionProperty()); // This has been true by default before
searchSourceBuilder.trackScores(query.getTrackScores());
if (query.getSourceFilter() != null) {
searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes());
}
if (query.getPageable().isPaged()) {
long offset = query.getPageable().getOffset();
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE));
}
searchSourceBuilder.from((int) offset);
searchSourceBuilder.size(query.getPageable().getPageSize());
}
if (query.getIndicesOptions() != null) {
request.indicesOptions(query.getIndicesOptions());
}
sort(query, entity).forEach(searchSourceBuilder::sort);
if (query.getMinScore() > 0) {
searchSourceBuilder.minScore(query.getMinScore());
}
request.source(searchSourceBuilder);
return Flux.from(
execute(client -> client.search(request).map(it -> mapper.mapEntity(it.getSourceAsString(), resultType))));
}
/**
* Execute within a {@link ClientCallback} managing resources and translating errors.
*
* @param callback must not be {@literal null}.
* @param <T>
* @return the {@link Publisher} emitting results.
*/
public <T> Publisher<T> execute(ClientCallback<Publisher<T>> callback) {
return Flux.from(callback.doWithClient(this.client)).onErrorMap(this::translateException);
}
// Customization Hooks
protected Mono<GetResult> goGet(String id, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
return doGet(new GetRequest(indexToUse, typeToUse, id));
}
protected Mono<GetResult> doGet(GetRequest request) {
return Mono.from(execute(client -> client.get(request))) //
.onErrorResume((it) -> {
if (it instanceof HttpClientErrorException) {
return ((HttpClientErrorException) it).getRawStatusCode() == 404;
}
return false;
}, (it) -> Mono.empty());
}
protected Mono<IndexResponse> doIndex(Object value, ElasticsearchPersistentEntity<?> entity, @Nullable String index,
@Nullable String type) {
PersistentPropertyAccessor propertyAccessor = entity.getPropertyAccessor(value);
Object id = propertyAccessor.getProperty(entity.getIdProperty());
String indexToUse = indexName(index, entity);
String typeToUse = typeName(type, entity);
IndexRequest request = id != null ? new IndexRequest(indexToUse, typeToUse, id.toString())
: new IndexRequest(indexToUse, typeToUse);
try {
request.source(mapper.getEntityMapper().mapToString(value), Requests.INDEX_CONTENT_TYPE);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (entity.hasVersionProperty()) {
Object version = propertyAccessor.getProperty(entity.getVersionProperty());
if (version != null) {
request.version(((Number) version).longValue());
request.versionType(EXTERNAL);
}
}
if (entity.getParentIdProperty() != null) {
Object parentId = propertyAccessor.getProperty(entity.getParentIdProperty());
if (parentId != null) {
request.parent(parentId.toString());
}
}
return doIndex(request.setRefreshPolicy(RefreshPolicy.IMMEDIATE));
}
protected Mono<IndexResponse> doIndex(IndexRequest request) {
return Mono.from(execute(client -> client.index(request)));
}
// private helpers
private static String indexName(@Nullable String index, ElasticsearchPersistentEntity<?> entity) {
return StringUtils.isEmpty(index) ? entity.getIndexName() : index;
}
private static String typeName(@Nullable String type, ElasticsearchPersistentEntity<?> entity) {
return StringUtils.isEmpty(type) ? entity.getIndexType() : type;
}
private static String[] indices(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) {
if (query.getIndices().isEmpty()) {
return new String[] { entity.getIndexName() };
}
return query.getIndices().toArray(new String[0]);
}
private static String[] indexTypes(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) {
if (query.getTypes().isEmpty()) {
return new String[] { entity.getIndexType() };
}
return query.getTypes().toArray(new String[0]);
}
private List<FieldSortBuilder> sort(Query query, ElasticsearchPersistentEntity<?> entity) {
if (query.getSort() == null || query.getSort().isUnsorted()) {
return Collections.emptyList();
}
List<FieldSortBuilder> mappedSort = new ArrayList<>();
for (Sort.Order order : query.getSort()) {
FieldSortBuilder sort = SortBuilders.fieldSort(entity.getPersistentProperty(order.getProperty()).getFieldName())
.order(order.getDirection().isDescending() ? SortOrder.DESC : SortOrder.ASC);
if (order.getNullHandling() == Sort.NullHandling.NULLS_FIRST) {
sort.missing("_first");
} else if (order.getNullHandling() == Sort.NullHandling.NULLS_LAST) {
sort.missing("_last");
}
mappedSort.add(sort);
}
return mappedSort;
}
private QueryBuilder mappedQuery(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) {
// TODO: we need to actually map the fields to the according field names!
QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(query.getCriteria());
return elasticsearchQuery != null ? elasticsearchQuery : QueryBuilders.matchAllQuery();
}
private QueryBuilder mappedFilterQuery(CriteriaQuery query, ElasticsearchPersistentEntity<?> entity) {
// TODO: this is actually strange in the RestTemplate:L378 - need to chack
return null;
}
private ElasticsearchPersistentEntity<?> lookupPersistentEntity(Class<?> type) {
return converter.getMappingContext().getPersistentEntity(type);
}
private Throwable translateException(Throwable throwable) {
if (!(throwable instanceof RuntimeException)) {
return throwable;
}
RuntimeException ex = exceptionTranslator.translateExceptionIfPossible((RuntimeException) throwable);
return ex != null ? ex : throwable;
}
// Additional types
public interface ClientCallback<T extends Publisher> {
T doWithClient(ReactiveElasticsearchClient client);
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch;
import lombok.SneakyThrows;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.data.elasticsearch.client.ElasticsearchClients;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.util.ObjectUtils;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
public final class TestUtils {
private TestUtils() {}
public static RestHighLevelClient restHighLevelClient() {
return ElasticsearchClients.createClient().connectedToLocalhost().rest();
}
public static ReactiveElasticsearchClient reactiveClient() {
return ElasticsearchClients.createClient().connectedToLocalhost().reactive();
}
@SneakyThrows
public static void deleteIndex(String... indexes) {
if (ObjectUtils.isEmpty(indexes)) {
return;
}
try (RestHighLevelClient client = restHighLevelClient()) {
for (String index : indexes) {
try {
client.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
} catch (ElasticsearchStatusException ex) {
// just ignore it
}
}
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.elasticsearch.client.reactive.HostProvider.VerificationMode;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockDelegatingElasticsearchHostProvider;
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive;
import org.springframework.web.reactive.function.client.ClientResponse;
/**
* @author Christoph Strobl
* @currentRead Golden Fool - Robin Hobb
*/
public class MultiNodeHostProviderUnitTests {
static final String HOST_1 = ":9200";
static final String HOST_2 = ":9201";
static final String HOST_3 = ":9202";
MockDelegatingElasticsearchHostProvider<MultiNodeHostProvider> mock;
MultiNodeHostProvider provider;
@Before
public void setUp() {
mock = ReactiveMockClientTestsUtils.multi(HOST_1, HOST_2, HOST_3);
provider = mock.getDelegate();
}
@Test // DATAES-488
public void refreshHostStateShouldUpdateNodeStateCorrectly() {
mock.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::ok);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
assertThat(provider.getCachedHostState()).extracting(ElasticsearchHost::getState).containsExactly(State.OFFLINE,
State.ONLINE, State.ONLINE);
}
@Test // DATAES-488
public void getActiveReturnsFirstActiveHost() {
mock.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::error);
provider.getActive().as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete();
}
@Test // DATAES-488
public void getActiveErrorsWhenNoActiveHostFound() {
mock.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::error);
mock.when(HOST_3).receive(Receive::error);
provider.getActive().as(StepVerifier::create).expectError(IllegalStateException.class);
}
@Test // DATAES-488
public void lazyModeDoesNotResolveHostsTwice() {
mock.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::error);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
provider.getActive(VerificationMode.LAZY).as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete();
verify(mock.client(":9201")).head();
}
@Test // DATAES-488
public void alwaysModeDoesNotResolveHostsTwice() {
mock.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).receive(Receive::ok);
mock.when(HOST_3).receive(Receive::error);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
provider.getActive(VerificationMode.FORCE).as(StepVerifier::create).expectNext(mock.client(HOST_2))
.verifyComplete();
verify(mock.client(HOST_2), times(2)).head();
}
@Test // DATAES-488
public void triesDeadHostsIfNoActiveFound() {
mock.when(HOST_1).receive(Receive::error);
mock.when(HOST_2).get(requestHeadersUriSpec -> {
ClientResponse response1 = mock(ClientResponse.class);
Receive.error(response1);
ClientResponse response2 = mock(ClientResponse.class);
Receive.ok(response2);
when(requestHeadersUriSpec.exchange()).thenReturn(Mono.just(response1), Mono.just(response2));
});
mock.when(HOST_3).receive(Receive::error);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
assertThat(provider.getCachedHostState()).extracting(ElasticsearchHost::getState).containsExactly(State.OFFLINE,
State.OFFLINE, State.OFFLINE);
provider.getActive().as(StepVerifier::create).expectNext(mock.client(HOST_2)).verifyComplete();
verify(mock.client(HOST_2), times(2)).head();
}
}

View File

@ -0,0 +1,467 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
/**
* @author Christoph Strobl
* @currentRead Fool's Fate - Robin Hobb
*/
public class ReactiveElasticsearchClientTests {
static final String INDEX_I = "idx-1-reactive-client-tests";
static final String INDEX_II = "idx-2-reactive-client-tests";
static final String TYPE_I = "doc-type-1";
static final String TYPE_II = "doc-type-2";
static final Map<String, String> DOC_SOURCE;
RestHighLevelClient syncClient;
ReactiveElasticsearchClient client;
static {
Map<String, String> source = new LinkedHashMap<>();
source.put("firstname", "chade");
source.put("lastname", "fallstar");
DOC_SOURCE = Collections.unmodifiableMap(source);
}
@Before
public void setUp() {
syncClient = TestUtils.restHighLevelClient();
client = TestUtils.reactiveClient();
}
@After
public void after() throws IOException {
TestUtils.deleteIndex(INDEX_I, INDEX_II);
syncClient.close();
}
@Test // DATAES-488
public void pingForActiveHostShouldReturnTrue() {
client.ping().as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-488
public void pingForUnknownHostShouldReturnFalse() {
DefaultReactiveElasticsearchClient.create(HttpHeaders.EMPTY, "http://localhost:4711").ping() //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
@Test // DATAES-488
public void infoShouldReturnClusterInformation() {
client.info().as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.isAvailable()).isTrue();
assertThat(it.getVersion()).isGreaterThanOrEqualTo(Version.CURRENT);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void getShouldFetchDocumentById() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
client.get(new GetRequest(INDEX_I, TYPE_I, id)) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id);
assertThat(it.getSource()).containsAllEntriesOf(DOC_SOURCE);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void getShouldCompleteForNonExistingDocuments() {
addSourceDocument().ofType(TYPE_I).to(INDEX_I);
String id = "this-one-does-not-exist";
client.get(new GetRequest(INDEX_I, TYPE_I, id)) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-488
public void getShouldCompleteForNonExistingType() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
client.get(new GetRequest(INDEX_I, "fantasy-books", id)) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-488
public void multiGetShouldReturnAllDocumentsFromSameCollection() {
String id1 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
String id2 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
MultiGetRequest request = new MultiGetRequest() //
.add(INDEX_I, TYPE_I, id1) //
.add(INDEX_I, TYPE_I, id2);
client.multiGet(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id1);
}) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id2);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void multiGetShouldReturnAllExistingDocumentsFromSameCollection() {
String id1 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
addSourceDocument().ofType(TYPE_I).to(INDEX_I);
MultiGetRequest request = new MultiGetRequest() //
.add(INDEX_I, TYPE_I, id1) //
.add(INDEX_I, TYPE_I, "this-one-does-not-exist");
client.multiGet(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id1);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void multiGetShouldSkipNonExistingDocuments() {
String id1 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
String id2 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
MultiGetRequest request = new MultiGetRequest() //
.add(INDEX_I, TYPE_I, id1) //
.add(INDEX_I, TYPE_I, "this-one-does-not-exist") //
.add(INDEX_I, TYPE_I, id2); //
client.multiGet(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id1);
}) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id2);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void multiGetShouldCompleteIfNothingFound() {
String id1 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
String id2 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
client.multiGet(new MultiGetRequest().add(INDEX_II, TYPE_I, id1).add(INDEX_II, TYPE_I, id2)) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-488
public void multiGetShouldReturnAllExistingDocumentsFromDifferentCollection() {
String id1 = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
String id2 = addSourceDocument().ofType(TYPE_II).to(INDEX_II);
MultiGetRequest request = new MultiGetRequest() //
.add(INDEX_I, TYPE_I, id1) //
.add(INDEX_II, TYPE_II, id2);
client.multiGet(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id1);
}) //
.consumeNextWith(it -> {
assertThat(it.getId()).isEqualTo(id2);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void existsReturnsTrueForExistingDocuments() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
client.exists(new GetRequest(INDEX_I, TYPE_I, id)) //
.as(StepVerifier::create) //
.expectNext(true)//
.verifyComplete();
}
@Test // DATAES-488
public void existsReturnsFalseForNonExistingDocuments() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
client.exists(new GetRequest(INDEX_II, TYPE_I, id)) //
.as(StepVerifier::create) //
.expectNext(false)//
.verifyComplete();
}
@Test // DATAES-488
public void indexShouldAddDocument() {
IndexRequest request = indexRequest(DOC_SOURCE, INDEX_I, TYPE_I);
client.index(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.status()).isEqualTo(RestStatus.CREATED);
assertThat(it.getId()).isEqualTo(request.id());
})//
.verifyComplete();
}
@Test // DATAES-488
public void indexShouldErrorForExistingDocuments() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
IndexRequest request = indexRequest(DOC_SOURCE, INDEX_I, TYPE_I)//
.id(id);
client.index(request) //
.as(StepVerifier::create) //
.consumeErrorWith(error -> {
assertThat(error).isInstanceOf(ElasticsearchStatusException.class);
}) //
.verify();
}
@Test // DATAES-488
public void updateShouldUpsertNonExistingDocumentWhenUsedWithUpsert() {
String id = UUID.randomUUID().toString();
UpdateRequest request = new UpdateRequest(INDEX_I, TYPE_I, id) //
.doc(DOC_SOURCE) //
.docAsUpsert(true);
client.update(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.status()).isEqualTo(RestStatus.CREATED);
assertThat(it.getId()).isEqualTo(id);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void updateShouldUpdateExistingDocument() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
UpdateRequest request = new UpdateRequest(INDEX_I, TYPE_I, id) //
.doc(Collections.singletonMap("dutiful", "farseer"));
client.update(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.status()).isEqualTo(RestStatus.OK);
assertThat(it.getId()).isEqualTo(id);
assertThat(it.getVersion()).isEqualTo(2);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void updateShouldErrorNonExistingDocumentWhenNotUpserted() {
String id = UUID.randomUUID().toString();
UpdateRequest request = new UpdateRequest(INDEX_I, TYPE_I, id) //
.doc(DOC_SOURCE);
client.update(request) //
.as(StepVerifier::create) //
.consumeErrorWith(error -> {
assertThat(error).isInstanceOf(ElasticsearchStatusException.class);
}) //
.verify();
}
@Test // DATAES-488
public void deleteShouldRemoveExistingDocument() {
String id = addSourceDocument().ofType(TYPE_I).to(INDEX_I);
DeleteRequest request = new DeleteRequest(INDEX_I, TYPE_I, id);
client.delete(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.status()).isEqualTo(RestStatus.OK);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void deleteShouldReturnNotFoundForNonExistingDocument() {
addSourceDocument().ofType(TYPE_I).to(INDEX_I);
DeleteRequest request = new DeleteRequest(INDEX_I, TYPE_I, "this-one-does-not-exist");
client.delete(request) //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it.status()).isEqualTo(RestStatus.NOT_FOUND);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void searchShouldFindExistingDocuments() {
addSourceDocument().ofType(TYPE_I).to(INDEX_I);
addSourceDocument().ofType(TYPE_I).to(INDEX_I);
SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) //
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
client.search(request) //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@Test // DATAES-488
public void searchShouldCompleteIfNothingFound() throws IOException {
syncClient.indices().create(new CreateIndexRequest(INDEX_I));
SearchRequest request = new SearchRequest(INDEX_I).types(TYPE_I) //
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
client.search(request) //
.as(StepVerifier::create) //
.verifyComplete();
}
AddToIndexOfType addSourceDocument() {
return add(DOC_SOURCE);
}
AddToIndexOfType add(Map source) {
return new AddDocument(source);
}
IndexRequest indexRequest(Map source, String index, String type) {
return new IndexRequest(index, type) //
.id(UUID.randomUUID().toString()) //
.source(source) //
.setRefreshPolicy(RefreshPolicy.IMMEDIATE) //
.create(true);
}
String doIndex(Map source, String index, String type) {
try {
return syncClient.index(indexRequest(source, index, type)).getId();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
interface AddToIndexOfType extends AddToIndex {
AddToIndex ofType(String type);
}
interface AddToIndex {
String to(String index);
}
class AddDocument implements AddToIndexOfType {
Map source;
@Nullable String type;
AddDocument(Map source) {
this.source = source;
}
@Override
public AddToIndex ofType(String type) {
this.type = type;
return this;
}
@Override
public String to(String index) {
return doIndex(new LinkedHashMap(source), index, type);
}
}
}

View File

@ -0,0 +1,554 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import static org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive.*;
import reactor.test.StepVerifier;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteResponse.Result;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockDelegatingElasticsearchHostProvider;
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
/**
* @author Christoph Strobl
* @currentRead Golden Fool - Robin Hobb
*/
public class ReactiveElasticsearchClientUnitTests {
static final String HOST = ":9200";
MockDelegatingElasticsearchHostProvider<HostProvider> hostProvider;
ReactiveElasticsearchClient client;
@Before
public void setUp() {
hostProvider = ReactiveMockClientTestsUtils.provider(HOST).withActiveDefaultHost(HOST);
client = new DefaultReactiveElasticsearchClient(hostProvider);
}
// --> PING
@Test
public void pingShouldHitMainEndpoint() {
hostProvider.when(HOST) //
.receive(Receive::ok);
client.ping() //
.then() //
.as(StepVerifier::create) //
.verifyComplete();
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/"), any(Map.class));
});
}
@Test // DATAES-488
public void pingShouldReturnTrueOnHttp200() {
hostProvider.when(HOST) //
.receive(Receive::ok);
client.ping() //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@Test // DATAES-488
public void pingShouldReturnFalseOnNonHttp200() {
hostProvider.when(HOST) //
.receive(Receive::error);
client.ping() //
.as(StepVerifier::create) //
.expectNext(false) //
.verifyComplete();
}
// --> INFO
@Test
public void infoShouldHitMainEndpoint() {
hostProvider.when(HOST) //
.receiveInfo();
client.info() //
.then() //
.as(StepVerifier::create) //
.verifyComplete();
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/"), any(Map.class));
});
}
@Test // DATAES-488
public void infoShouldReturnResponseCorrectly() {
hostProvider.when(HOST) //
.receiveInfo();
client.info() //
.as(StepVerifier::create) //
.consumeNextWith(mainResponse -> {}) //
.verifyComplete();
}
// --> GET
@Test // DATAES-488
public void getShouldHitGetEndpoint() {
hostProvider.when(HOST).receive(clientResponse -> {
when(clientResponse.statusCode()).thenReturn(HttpStatus.ACCEPTED, HttpStatus.NOT_FOUND);
});
hostProvider.when(HOST) //
.receiveGetByIdNotFound();
client.get(new GetRequest("twitter").id("1")) //
.then() //
.as(StepVerifier::create) //
.verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.GET);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/twitter/_all/1"), any(Map.class));
});
}
@Test // DATAES-488
public void getShouldReturnExistingDocument() {
hostProvider.when(HOST) //
.receiveGetById();
client.get(new GetRequest("twitter").id("1")) //
.as(StepVerifier::create) //
.consumeNextWith(result -> {
assertThat(result.isExists()).isTrue();
assertThat(result.getIndex()).isEqualTo("twitter");
assertThat(result.getId()).isEqualTo("1");
assertThat(result.getSource()) //
.containsEntry("user", "kimchy") //
.containsEntry("message", "Trying out Elasticsearch, so far so good?") //
.containsKey("post_date");
}) //
.verifyComplete();
}
@Test // DATAES-488
public void getShouldReturnEmptyForNonExisting() {
hostProvider.when(HOST) //
.receiveGetByIdNotFound();
client.get(new GetRequest("twitter").id("1")) //
.as(StepVerifier::create) //
.verifyComplete();
}
// --> MGET
@Test // DATAES-488
public void multiGetShouldHitMGetEndpoint() {
hostProvider.when(HOST) //
.receiveJsonFromFile("multi-get-ok-2-hits");
client.multiGet(new MultiGetRequest().add("twitter", "_doc", "1").add("twitter", "_doc", "2")) //
.then() //
.as(StepVerifier::create) //
.verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.POST);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/_mget"), any(Map.class));
verify(requestBodyUriSpec).body(any(Publisher.class), any(Class.class));
});
}
@Test // DATAES-488
public void multiGetShouldReturnExistingDocuments() {
hostProvider.when(HOST) //
.receiveJsonFromFile("multi-get-ok-2-hits");
client.multiGet(new MultiGetRequest().add("twitter", "_doc", "1").add("twitter", "_doc", "2")) //
.as(StepVerifier::create) //
.consumeNextWith(result -> {
assertThat(result.isExists()).isTrue();
assertThat(result.getIndex()).isEqualTo("twitter");
assertThat(result.getId()).isEqualTo("1");
assertThat(result.getSource()) //
.containsEntry("user", "kimchy") //
.containsEntry("message", "Trying out Elasticsearch, so far so good?") //
.containsKey("post_date");
}) //
.consumeNextWith(result -> {
assertThat(result.isExists()).isTrue();
assertThat(result.getIndex()).isEqualTo("twitter");
assertThat(result.getId()).isEqualTo("2");
assertThat(result.getSource()) //
.containsEntry("user", "kimchy") //
.containsEntry("message", "Another tweet, will it be indexed?") //
.containsKey("post_date");
}) //
.verifyComplete();
}
@Test // DATAES-488
public void multiGetShouldWorkForNonExistingDocuments() {
hostProvider.when(HOST) //
.receiveJsonFromFile("multi-get-ok-2-hits-1-unavailable");
client.multiGet(new MultiGetRequest().add("twitter", "_doc", "1").add("twitter", "_doc", "2")) //
.as(StepVerifier::create) //
.consumeNextWith(result -> {
assertThat(result.isExists()).isTrue();
assertThat(result.getIndex()).isEqualTo("twitter");
assertThat(result.getId()).isEqualTo("1");
assertThat(result.getSource()) //
.containsEntry("user", "kimchy") //
.containsEntry("message", "Trying out Elasticsearch, so far so good?") //
.containsKey("post_date");
}) //
.consumeNextWith(result -> {
assertThat(result.isExists()).isTrue();
assertThat(result.getIndex()).isEqualTo("twitter");
assertThat(result.getId()).isEqualTo("3");
assertThat(result.getSource()) //
.containsEntry("user", "elastic") //
.containsEntry("message", "Building the site, should be kewl") //
.containsKey("post_date");
}) //
.verifyComplete();
}
// --> EXISTS
@Test // DATAES-488
public void existsShouldHitGetEndpoint() {
hostProvider.when(HOST) //
.receiveGetById();
client.exists(new GetRequest("twitter").id("1")) //
.then() //
.as(StepVerifier::create) //
.verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.HEAD);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/twitter/_all/1"), any(Map.class));
});
}
@Test // DATAES-488
public void existsShouldReturnTrueIfExists() {
hostProvider.when(HOST) //
.receiveGetById();
client.exists(new GetRequest("twitter").id("1")) //
.as(StepVerifier::create) //
.expectNext(true).verifyComplete();
}
@Test // DATAES-488
public void existsShouldReturnFalseIfNotExists() {
hostProvider.when(HOST) //
.receiveGetByIdNotFound();
client.exists(new GetRequest("twitter").id("1")) //
.as(StepVerifier::create) //
.expectNext(false).verifyComplete();
}
// --> INDEX
@Test // DATAES-488
public void indexNewShouldHitCreateEndpoint() {
hostProvider.when(HOST) //
.receiveIndexCreated();
client.index(new IndexRequest("twitter").id("10").create(true).source(" { foo : \"bar\" }", XContentType.JSON))
.then() //
.as(StepVerifier::create) //
.verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.PUT);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/twitter/10/_create"), any(Map.class));
verify(requestBodyUriSpec).contentType(MediaType.APPLICATION_JSON);
});
}
@Test // DATAES-488
public void indexExistingShouldHitEndpointCorrectly() {
hostProvider.when(HOST) //
.receiveIndexUpdated();
client.index(new IndexRequest("twitter").id("10").source(" { foo : \"bar\" }", XContentType.JSON)).then() //
.as(StepVerifier::create) //
.verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.PUT);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/twitter/10"), any(Map.class));
verify(requestBodyUriSpec).contentType(MediaType.APPLICATION_JSON);
});
}
@Test // DATAES-488
public void indexShouldReturnCreatedWhenNewDocumentIndexed() {
hostProvider.when(HOST) //
.receiveIndexCreated();
client.index(new IndexRequest("twitter").id("10").create(true).source(" { foo : \"bar\" }", XContentType.JSON))
.as(StepVerifier::create) //
.consumeNextWith(response -> {
assertThat(response.getId()).isEqualTo("10");
assertThat(response.getIndex()).isEqualTo("twitter");
assertThat(response.getResult()).isEqualTo(Result.CREATED);
}) //
.verifyComplete();
}
@Test // DATAES-488
public void indexShouldReturnUpdatedWhenExistingDocumentIndexed() {
hostProvider.when(HOST) //
.receiveIndexUpdated();
client.index(new IndexRequest("twitter").id("1").source(" { foo : \"bar\" }", XContentType.JSON))
.as(StepVerifier::create) //
.consumeNextWith(response -> {
assertThat(response.getId()).isEqualTo("1");
assertThat(response.getIndex()).isEqualTo("twitter");
assertThat(response.getResult()).isEqualTo(Result.UPDATED);
}) //
.verifyComplete();
}
// --> UPDATE
@Test // DATAES-488
public void updateShouldHitEndpointCorrectly() {
hostProvider.when(HOST) //
.receiveUpdateOk();
client.update(new UpdateRequest("twitter", "doc", "1").doc(Collections.singletonMap("user", "cstrobl"))).then() //
.as(StepVerifier::create) //
.verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.POST);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/twitter/doc/1/_update"), any(Map.class));
verify(requestBodyUriSpec).contentType(MediaType.APPLICATION_JSON);
});
}
@Test // DATAES-488
public void updateShouldEmitResponseCorrectly() {
hostProvider.when(HOST) //
.receiveUpdateOk();
client.update(new UpdateRequest("twitter", "doc", "1").doc(Collections.singletonMap("user", "cstrobl")))
.as(StepVerifier::create) //
.consumeNextWith(updateResponse -> {
assertThat(updateResponse.getResult()).isEqualTo(Result.UPDATED);
assertThat(updateResponse.getVersion()).isEqualTo(2);
assertThat(updateResponse.getId()).isEqualTo("1");
assertThat(updateResponse.getIndex()).isEqualTo("twitter");
}) //
.verifyComplete();
}
@Test // DATAES-488
public void updateShouldEmitErrorWhenNotFound() {
hostProvider.when(HOST) //
.updateFail();
client.update(new UpdateRequest("twitter", "doc", "1").doc(Collections.singletonMap("user", "cstrobl")))
.as(StepVerifier::create) //
.expectError(ElasticsearchStatusException.class) //
.verify();
}
// --> DELETE
@Test // DATAES-488
public void deleteShouldHitEndpointCorrectly() {
hostProvider.when(HOST) //
.receiveDeleteOk();
client.delete(new DeleteRequest("twitter", "doc", "1")).then() //
.as(StepVerifier::create) //
.verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.DELETE);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/twitter/doc/1"), any(Map.class));
});
}
@Test // DATAES-488
public void deleteShouldEmitResponseCorrectly() {
hostProvider.when(HOST) //
.receiveDeleteOk();
client.delete(new DeleteRequest("twitter", "doc", "1")) //
.as(StepVerifier::create) //
.consumeNextWith(deleteResponse -> {
assertThat(deleteResponse.getResult()).isEqualTo(Result.DELETED);
assertThat(deleteResponse.getVersion()).isEqualTo(1);
assertThat(deleteResponse.getId()).isEqualTo("1");
assertThat(deleteResponse.getIndex()).isEqualTo("twitter");
}) //
.verifyComplete();
}
// --> SEARCH
@Test // DATAES-488
public void searchShouldHitSearchEndpoint() {
hostProvider.when(HOST) //
.receiveSearchOk();
client.search(new SearchRequest("twitter")).as(StepVerifier::create).verifyComplete();
verify(hostProvider.client(HOST)).method(HttpMethod.POST);
hostProvider.when(HOST).exchange(requestBodyUriSpec -> {
verify(requestBodyUriSpec).uri(eq("/twitter/_search"), any(Map.class));
});
}
@Test // DATAES-488
public void searchShouldReturnSingleResultCorrectly() {
hostProvider.when(HOST) //
.receive(Receive::json) //
.body(fromPath("search-ok-single-hit"));
client.search(new SearchRequest("twitter")) //
.as(StepVerifier::create) //
.consumeNextWith(hit -> {
assertThat(hit.getId()).isEqualTo("2");
assertThat(hit.getIndex()).isEqualTo("twitter");
assertThat(hit.getSourceAsMap()) //
.containsEntry("user", "kimchy") //
.containsEntry("message", "Another tweet, will it be indexed?") //
.containsKey("post_date");
}).verifyComplete();
}
@Test // DATAES-488
public void searchShouldReturnMultipleResultsCorrectly() {
hostProvider.when(HOST) //
.receive(Receive::json) //
.body(fromPath("search-ok-multiple-hits"));
client.search(new SearchRequest("twitter")) //
.as(StepVerifier::create) //
.consumeNextWith(hit -> {
assertThat(hit.getId()).isEqualTo("2");
assertThat(hit.getIndex()).isEqualTo("twitter");
assertThat(hit.getSourceAsMap()) //
.containsEntry("user", "kimchy") //
.containsEntry("message", "Another tweet, will it be indexed?") //
.containsKey("post_date");
}) //
.consumeNextWith(hit -> {
assertThat(hit.getId()).isEqualTo("1");
assertThat(hit.getIndex()).isEqualTo("twitter");
assertThat(hit.getSourceAsMap()) //
.containsEntry("user", "kimchy") //
.containsEntry("message", "Trying out Elasticsearch, so far so good?") //
.containsKey("post_date");
}).verifyComplete();
}
@Test // DATAES-488
public void searchShouldReturnEmptyFluxIfNothingFound() {
hostProvider.when(HOST) //
.receiveSearchOk();
client.search(new SearchRequest("twitter")) //
.as(StepVerifier::create) //
.verifyComplete();
}
}

View File

@ -0,0 +1,432 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import static org.mockito.Mockito.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.mockito.Mockito;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Send;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
/**
* @author Christoph Strobl
* @since 2018/10
*/
public class ReactiveMockClientTestsUtils {
public static MockDelegatingElasticsearchHostProvider<SingleNodeHostProvider> single(String host) {
return provider(host);
}
public static MockDelegatingElasticsearchHostProvider<MultiNodeHostProvider> multi(String... hosts) {
return provider(hosts);
}
public static <T extends HostProvider> MockDelegatingElasticsearchHostProvider<T> provider(String... hosts) {
WebClientProvider clientProvider = new WebClientProvider();
ErrorCollector errorCollector = new ErrorCollector();
HostProvider delegate = null;
if (hosts.length == 1) {
delegate = new SingleNodeHostProvider(HttpHeaders.EMPTY, errorCollector, hosts[0]) {
@Override // hook in there to modify result
public WebClient createWebClient(String host, HttpHeaders headers) {
return clientProvider.get(host);
}
};
} else {
delegate = new MultiNodeHostProvider(HttpHeaders.EMPTY, errorCollector, hosts) {
@Override // hook in there to modify result
public WebClient createWebClient(String host, HttpHeaders headers) {
return clientProvider.get(host);
}
};
}
return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate,
null);
}
public static class ErrorCollector implements Consumer<Throwable> {
List<Throwable> errors = new CopyOnWriteArrayList<>();
@Override
public void accept(Throwable throwable) {
errors.add(throwable);
}
List<Throwable> captured() {
return Collections.unmodifiableList(errors);
}
}
public static class MockDelegatingElasticsearchHostProvider<T extends HostProvider> implements HostProvider {
private final T delegate;
private final WebClientProvider clientProvider;
private final ErrorCollector errorCollector;
private @Nullable String activeDefaultHost;
public MockDelegatingElasticsearchHostProvider(HttpHeaders httpHeaders, WebClientProvider clientProvider,
ErrorCollector errorCollector, T delegate, String activeDefaultHost) {
this.errorCollector = errorCollector;
this.clientProvider = clientProvider;
this.delegate = delegate;
this.activeDefaultHost = activeDefaultHost;
}
public Mono<String> lookupActiveHost() {
return delegate.lookupActiveHost();
}
public Mono<String> lookupActiveHost(VerificationMode verificationMode) {
if (StringUtils.hasText(activeDefaultHost)) {
return Mono.just(activeDefaultHost);
}
return delegate.lookupActiveHost(verificationMode);
}
public Mono<WebClient> getActive() {
return delegate.getActive();
}
public Mono<WebClient> getActive(VerificationMode verificationMode) {
return delegate.getActive(verificationMode);
}
public Mono<WebClient> getActive(VerificationMode verificationMode, HttpHeaders headers) {
return delegate.getActive(verificationMode, headers);
}
public WebClient createWebClient(String host, HttpHeaders headers) {
return delegate.createWebClient(host, headers);
}
@Override
public Mono<ClusterInformation> clusterInfo() {
if (StringUtils.hasText(activeDefaultHost)) {
return Mono.just(new ClusterInformation(Collections.singleton(ElasticsearchHost.online(activeDefaultHost))));
}
return delegate.clusterInfo();
}
@Override
public HttpHeaders getDefaultHeaders() {
return delegate.getDefaultHeaders();
}
@Override
public HostProvider withDefaultHeaders(HttpHeaders headers) {
throw new UnsupportedOperationException();
}
public Send when(String host) {
return clientProvider.when(host);
}
public WebClient client(String host) {
return clientProvider.when(host).client();
}
public List<Throwable> errors() {
return errorCollector.captured();
}
public T getDelegate() {
return delegate;
}
@Override
public HostProvider withErrorListener(Consumer<Throwable> errorListener) {
throw new UnsupportedOperationException();
}
public MockDelegatingElasticsearchHostProvider<T> withActiveDefaultHost(String host) {
return new MockDelegatingElasticsearchHostProvider(HttpHeaders.EMPTY, clientProvider, errorCollector, delegate,
host);
}
}
public static class WebClientProvider {
private final Object lock = new Object();
private Map<String, WebClient> clientMap;
private Map<String, RequestHeadersUriSpec> headersUriSpecMap;
private Map<String, RequestBodyUriSpec> bodyUriSpecMap;
private Map<String, ClientResponse> responseMap;
public WebClientProvider() {
this.clientMap = new LinkedHashMap<>();
this.headersUriSpecMap = new LinkedHashMap<>();
this.bodyUriSpecMap = new LinkedHashMap<>();
this.responseMap = new LinkedHashMap<>();
}
public WebClient get(String host) {
synchronized (lock) {
return clientMap.computeIfAbsent(host, key -> {
WebClient webClient = mock(WebClient.class);
RequestHeadersUriSpec headersUriSpec = mock(RequestHeadersUriSpec.class);
Mockito.when(webClient.get()).thenReturn(headersUriSpec);
Mockito.when(webClient.head()).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.uri(any(String.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.uri(any(), any(Map.class))).thenReturn(headersUriSpec);
Mockito.when(headersUriSpec.headers(any(Consumer.class))).thenReturn(headersUriSpec);
RequestBodyUriSpec bodyUriSpec = mock(RequestBodyUriSpec.class);
Mockito.when(webClient.method(any())).thenReturn(bodyUriSpec);
Mockito.when(bodyUriSpec.body(any())).thenReturn(headersUriSpec);
Mockito.when(bodyUriSpec.uri(any(), any(Map.class))).thenReturn(bodyUriSpec);
Mockito.when(bodyUriSpec.headers(any(Consumer.class))).thenReturn(bodyUriSpec);
ClientResponse response = mock(ClientResponse.class);
Mockito.when(headersUriSpec.exchange()).thenReturn(Mono.just(response));
Mockito.when(bodyUriSpec.exchange()).thenReturn(Mono.just(response));
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED);
headersUriSpecMap.putIfAbsent(host, headersUriSpec);
bodyUriSpecMap.putIfAbsent(host, bodyUriSpec);
responseMap.putIfAbsent(host, response);
return webClient;
});
}
}
public Send when(String host) {
return new CallbackImpl(get(host), headersUriSpecMap.get(host), bodyUriSpecMap.get(host), responseMap.get(host));
}
public interface Client {
WebClient client();
}
public interface Send extends Receive, Client {
Receive get(Consumer<RequestHeadersUriSpec> headerSpec);
Receive exchange(Consumer<RequestBodyUriSpec> bodySpec);
default Receive receiveJsonFromFile(String file) {
return receive(Receive::json) //
.body(Receive.fromPath(file));
}
default Receive receiveInfo() {
return receiveJsonFromFile("info") //
.receive(Receive::ok);
}
default Receive receiveIndexCreated() {
return receiveJsonFromFile("index-ok-created") //
.receive(Receive::ok);
}
default Receive receiveIndexUpdated() {
return receiveJsonFromFile("index-ok-updated") //
.receive(Receive::ok);
}
default Receive receiveSearchOk() {
return receiveJsonFromFile("search-ok-no-hits") //
.receive(Receive::ok);
}
default Receive receiveGetByIdNotFound() {
return receiveJsonFromFile("get-by-id-no-hit") //
.receive(response -> {
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED, HttpStatus.NOT_FOUND);
});
}
default Receive receiveGetById() {
return receiveJsonFromFile("get-by-id-ok") //
.receive(Receive::ok);
}
default Receive receiveUpdateOk() {
return receiveJsonFromFile("update-ok-updated") //
.receive(Receive::ok);
}
default Receive receiveDeleteOk() {
return receiveJsonFromFile("update-ok-deleted") //
.receive(Receive::ok);
}
default Receive updateFail() {
return receiveJsonFromFile("update-error-not-found") //
.receive(response -> {
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED, HttpStatus.NOT_FOUND);
});
}
}
public interface Receive {
Receive receive(Consumer<ClientResponse> response);
default Receive body(String json) {
return body(() -> json.getBytes(StandardCharsets.UTF_8));
}
default Receive body(Supplier<byte[]> json) {
return body(new DefaultDataBufferFactory().wrap(json.get()));
}
default Receive body(Resource resource) {
return body(() -> {
try {
return StreamUtils.copyToByteArray(resource.getInputStream());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
default Receive body(DataBuffer dataBuffer) {
return receive(response -> Mockito.when(response.body(any())).thenReturn(Flux.just(dataBuffer)));
}
static void ok(ClientResponse response) {
Mockito.when(response.statusCode()).thenReturn(HttpStatus.ACCEPTED);
}
static void error(ClientResponse response) {
Mockito.when(response.statusCode()).thenReturn(HttpStatus.INTERNAL_SERVER_ERROR);
}
static void notFound(ClientResponse response) {
Mockito.when(response.statusCode()).thenReturn(HttpStatus.NOT_FOUND);
}
static void json(ClientResponse response) {
ClientResponse.Headers headers = Mockito.mock(ClientResponse.Headers.class);
Mockito.when(headers.contentType()).thenReturn(Optional.of(MediaType.APPLICATION_JSON));
Mockito.when(response.headers()).thenReturn(headers);
}
static Resource fromPath(String filename) {
return new ClassPathResource("/org/springframework/data/elasticsearch/client/" + filename + ".json");
}
}
class CallbackImpl implements Send, Receive {
WebClient client;
RequestHeadersUriSpec headersUriSpec;
RequestBodyUriSpec bodyUriSpec;
ClientResponse responseDelegate;
public CallbackImpl(WebClient client, RequestHeadersUriSpec headersUriSpec, RequestBodyUriSpec bodyUriSpec,
ClientResponse responseDelegate) {
this.client = client;
this.headersUriSpec = headersUriSpec;
this.bodyUriSpec = bodyUriSpec;
this.responseDelegate = responseDelegate;
}
@Override
public Receive get(Consumer<RequestHeadersUriSpec> uriSpec) {
uriSpec.accept(headersUriSpec);
return this;
}
@Override
public Receive exchange(Consumer<RequestBodyUriSpec> bodySpec) {
bodySpec.accept(this.bodyUriSpec);
return this;
}
@Override
public Receive receive(Consumer<ClientResponse> response) {
response.accept(responseDelegate);
return this;
}
@Override
public WebClient client() {
return client;
}
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.reactive;
import static org.assertj.core.api.Assertions.assertThat;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import reactor.test.StepVerifier;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.ElasticsearchHost.State;
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.MockDelegatingElasticsearchHostProvider;
import org.springframework.data.elasticsearch.client.reactive.ReactiveMockClientTestsUtils.WebClientProvider.Receive;
/**
* @author Christoph Strobl
* @currentRead Golden Fool - Robin Hobb
*/
public class SingleNodeHostProviderUnitTests {
static final String HOST_1 = ":9200";
MockDelegatingElasticsearchHostProvider<SingleNodeHostProvider> mock;
SingleNodeHostProvider provider;
@Before
public void setUp() {
mock = ReactiveMockClientTestsUtils.single(HOST_1);
provider = mock.getDelegate();
}
@Test // DATAES-488
public void refreshHostStateShouldUpdateNodeStateCorrectly() {
mock.when(HOST_1).receive(Receive::error);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
assertThat(provider.getCachedHostState()).extracting(ElasticsearchHost::getState).isEqualTo(State.OFFLINE);
}
@Test // DATAES-488
public void getActiveReturnsFirstActiveHost() {
mock.when(HOST_1).receive(Receive::ok);
provider.clusterInfo().as(StepVerifier::create).expectNextCount(1).verifyComplete();
assertThat(provider.getCachedHostState()).extracting(ElasticsearchHost::getState).isEqualTo(State.ONLINE);
}
@Test // DATAES-488
public void getActiveErrorsWhenNoActiveHostFound() {
mock.when(HOST_1).receive(Receive::error);
provider.getActive().as(StepVerifier::create).expectError(NoReachableHostException.class);
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;
import static org.apache.commons.lang.RandomStringUtils.*;
import static org.assertj.core.api.Assertions.*;
import reactor.test.StepVerifier;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.springframework.data.elasticsearch.TestUtils;
import org.springframework.data.elasticsearch.core.query.Criteria;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.entities.SampleEntity;
/**
* @author Christoph Strobl
* @currentRead Golden Fool - Robin Hobb
*/
public class ReactiveElasticsearchTemplateTests {
private ElasticsearchRestTemplate restTemplate;
private ReactiveElasticsearchTemplate template;
@Before
public void setUp() {
restTemplate = new ElasticsearchRestTemplate(TestUtils.restHighLevelClient());
TestUtils.deleteIndex("test-index-sample");
restTemplate.createIndex(SampleEntity.class);
restTemplate.putMapping(SampleEntity.class);
restTemplate.refresh(SampleEntity.class);
template = new ReactiveElasticsearchTemplate(TestUtils.reactiveClient());
}
@Test // DATAES-488
public void indexWithIdShouldWork() {
String documentId = randomNumeric(5);
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("foo bar")
.version(System.currentTimeMillis()).build();
template.index(sampleEntity).as(StepVerifier::create).expectNextCount(1).verifyComplete();
restTemplate.refresh(SampleEntity.class);
List<SampleEntity> result = restTemplate
.queryForList(new CriteriaQuery(Criteria.where("message").is(sampleEntity.getMessage())), SampleEntity.class);
assertThat(result).hasSize(1);
}
@Test // DATAES-488
public void getShouldReturnEntity() {
String documentId = randomNumeric(5);
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message")
.version(System.currentTimeMillis()).build();
IndexQuery indexQuery = getIndexQuery(sampleEntity);
restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class);
template.get(documentId, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(sampleEntity) //
.verifyComplete();
}
@Test // DATAES-488
public void getForNothing() {
String documentId = randomNumeric(5);
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message")
.version(System.currentTimeMillis()).build();
IndexQuery indexQuery = getIndexQuery(sampleEntity);
restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class);
template.get("foo", SampleEntity.class) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAES-488
public void findShouldApplyCriteria() {
String documentId = randomNumeric(5);
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message")
.version(System.currentTimeMillis()).build();
IndexQuery indexQuery = getIndexQuery(sampleEntity);
restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class);
CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("some message"));
template.query(criteriaQuery, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(sampleEntity) //
.verifyComplete();
}
@Test // DATAES-488
public void findShouldReturnEmptyFluxIfNothingFound() {
String documentId = randomNumeric(5);
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message")
.version(System.currentTimeMillis()).build();
IndexQuery indexQuery = getIndexQuery(sampleEntity);
restTemplate.index(indexQuery);
restTemplate.refresh(SampleEntity.class);
CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("message").is("foo"));
template.query(criteriaQuery, SampleEntity.class) //
.as(StepVerifier::create) //
.verifyComplete();
}
private IndexQuery getIndexQuery(SampleEntity sampleEntity) {
return new IndexQueryBuilder().withId(sampleEntity.getId()).withObject(sampleEntity)
.withVersion(sampleEntity.getVersion()).build();
}
}

View File

@ -59,7 +59,7 @@ public class SynonymRepositoryTests {
public void shouldDo() {
//given
SynonymEntity entry1 = new SynonymEntity();
entry1.setText("Elizabeth is the English queen");
entry1.setText("Elizabeth is the english queen");
SynonymEntity entry2 = new SynonymEntity();
entry2.setText("Other text");

View File

@ -0,0 +1,6 @@
{
"_index" : "twitter",
"_type" : "doc",
"_id" : "5",
"found" : false
}

View File

@ -0,0 +1,12 @@
{
"_index" : "twitter",
"_type" : "doc",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"user" : "kimchy",
"post_date" : "2009-11-15T13:12:00",
"message" : "Trying out Elasticsearch, so far so good?"
}
}

View File

@ -0,0 +1,14 @@
{
"_index": "twitter",
"_type": "doc",
"_id": "10",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}

View File

@ -0,0 +1,14 @@
{
"_index" : "twitter",
"_type" : "doc",
"_id" : "1",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1
}

View File

@ -0,0 +1,17 @@
{
"cluster_name": "elasticsearch",
"cluster_uuid": "r1tpSEemQZiSVJbfAqOLjg",
"name": "T14BIoj",
"tagline": "You Know, for Search",
"version": {
"build_date": "2018-08-17T23:18:47.308994Z",
"build_flavor": "default",
"build_hash": "595516e",
"build_snapshot": false,
"build_type": "tar",
"lucene_version": "7.4.0",
"minimum_index_compatibility_version": "5.0.0",
"minimum_wire_compatibility_version": "5.6.0",
"number": "6.4.0"
}
}

View File

@ -0,0 +1,34 @@
{
"docs": [
{
"_index": "twitter",
"_type": "doc",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"user": "kimchy",
"post_date": "2009-11-15T13:12:00",
"message": "Trying out Elasticsearch, so far so good?"
}
},
{
"_index": "twitter",
"_type": "_doc",
"_id": "2",
"found": false
},
{
"_index": "twitter",
"_type": "doc",
"_id": "3",
"_version": 1,
"found": true,
"_source": {
"user": "elastic",
"post_date": "2010-01-15T01:46:38",
"message": "Building the site, should be kewl"
}
}
]
}

View File

@ -0,0 +1,28 @@
{
"docs": [
{
"_index": "twitter",
"_type": "doc",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"user": "kimchy",
"post_date": "2009-11-15T13:12:00",
"message": "Trying out Elasticsearch, so far so good?"
}
},
{
"_index": "twitter",
"_type": "doc",
"_id": "2",
"_version": 1,
"found": true,
"_source": {
"user": "kimchy",
"post_date": "2009-11-15T14:12:12",
"message": "Another tweet, will it be indexed?"
}
}
]
}

View File

@ -0,0 +1,38 @@
{
"took" : 52,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 0.2876821,
"hits" : [
{
"_index" : "twitter",
"_type" : "doc",
"_id" : "2",
"_score" : 0.2876821,
"_source" : {
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "Another tweet, will it be indexed?"
}
},
{
"_index" : "twitter",
"_type" : "doc",
"_id" : "1",
"_score" : 0.2876821,
"_source" : {
"user" : "kimchy",
"post_date" : "2009-11-15T13:12:00",
"message" : "Trying out Elasticsearch, so far so good?"
}
}
]
}
}

View File

@ -0,0 +1,15 @@
{
"took" : 226,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 0,
"max_score" : null,
"hits" : [ ]
}
}

View File

@ -0,0 +1,27 @@
{
"took" : 52,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 0.2876821,
"hits" : [
{
"_index" : "twitter",
"_type" : "doc",
"_id" : "2",
"_score" : 0.2876821,
"_source" : {
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "Another tweet, will it be indexed?"
}
}
]
}
}

View File

@ -0,0 +1,19 @@
{
"error": {
"index": "twitter",
"index_uuid": "C91lAFXcRR6GTpYv6QIJFQ",
"reason": "[doc][101]: document missing",
"root_cause": [
{
"index": "twitter",
"index_uuid": "C91lAFXcRR6GTpYv6QIJFQ",
"reason": "[doc][101]: document missing",
"shard": "1",
"type": "document_missing_exception"
}
],
"shard": "1",
"type": "document_missing_exception"
},
"status": 404
}

View File

@ -0,0 +1,14 @@
{
"_id": "1",
"_index": "twitter",
"_primary_term": 4,
"_seq_no": 2,
"_shards": {
"failed": 0,
"successful": 1,
"total": 2
},
"_type": "doc",
"_version": 1,
"result": "deleted"
}

View File

@ -0,0 +1,14 @@
{
"_index": "twitter",
"_type": "doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 4
}

View File

@ -14,13 +14,13 @@
"filter": {
"synonym_filter": {
"type": "synonym",
"lenient" : true,
"synonyms": [
"british,english",
"queen,monarch"
],
"ignore_case": "true"
]
}
}
}
}
}
}

View File

@ -0,0 +1,45 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' inside a plugin.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# |____ <arbitrary name1>.jar <-- classes, resources, dependencies
# |____ <arbitrary nameN>.jar <-- any number of jars
# |____ plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=6.0
# elasticsearch.version=6.0
# java.version=1.8
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=Adds "built in" analyzers to Elasticsearch.
#
# 'version': plugin's version
version=6.5.0
#
# 'name': the plugin name
name=analysis-common
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.analysis.common.CommonAnalysisPlugin
#
# 'java.version': version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version': version of elasticsearch compiled against
elasticsearch.version=6.5.0
### optional elements for plugins:
#
# 'extended.plugins': other plugins this plugin extends through SPI
extended.plugins=lang-painless
#
# 'has.native.controller': whether or not the plugin has a native controller
has.native.controller=false

View File

@ -0,0 +1,45 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' inside a plugin.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# |____ <arbitrary name1>.jar <-- classes, resources, dependencies
# |____ <arbitrary nameN>.jar <-- any number of jars
# |____ plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=6.0
# elasticsearch.version=6.0
# java.version=1.8
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=Module for ingest processors that do not require additional security permissions or have large dependencies and resources
#
# 'version': plugin's version
version=6.5.0
#
# 'name': the plugin name
name=ingest-common
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.ingest.common.IngestCommonPlugin
#
# 'java.version': version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version': version of elasticsearch compiled against
elasticsearch.version=6.5.0
### optional elements for plugins:
#
# 'extended.plugins': other plugins this plugin extends through SPI
extended.plugins=lang-painless
#
# 'has.native.controller': whether or not the plugin has a native controller
has.native.controller=false

View File

@ -1,20 +1,18 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' in a folder named `elasticsearch`
# inside all plugins.
# This file must exist as 'plugin-descriptor.properties' inside a plugin.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
#|____elasticsearch/
#| |____ <arbitrary name1>.jar <-- classes, resources, dependencies
#| |____ <arbitrary nameN>.jar <-- any number of jars
#| |____ plugin-descriptor.properties <-- example contents below:
# |____ <arbitrary name1>.jar <-- classes, resources, dependencies
# |____ <arbitrary nameN>.jar <-- any number of jars
# |____ plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=2.0
# elasticsearch.version=2.0
# java.version=1.7
# version=6.0
# elasticsearch.version=6.0
# java.version=1.8
#
### mandatory elements for all plugins:
#
@ -22,7 +20,7 @@
description=Lucene expressions integration for Elasticsearch
#
# 'version': plugin's version
version=6.2.2
version=6.5.0
#
# 'name': the plugin name
name=lang-expression
@ -37,7 +35,7 @@ classname=org.elasticsearch.script.expression.ExpressionPlugin
java.version=1.8
#
# 'elasticsearch.version': version of elasticsearch compiled against
elasticsearch.version=6.3.0
elasticsearch.version=6.5.0
### optional elements for plugins:
#
# 'extended.plugins': other plugins this plugin extends through SPI
@ -45,6 +43,3 @@ extended.plugins=
#
# 'has.native.controller': whether or not the plugin has a native controller
has.native.controller=false
#
# 'requires.keystore': whether or not the plugin needs the elasticsearch keystore be created
#requires.keystore=false

View File

@ -0,0 +1,45 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' inside a plugin.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# |____ <arbitrary name1>.jar <-- classes, resources, dependencies
# |____ <arbitrary nameN>.jar <-- any number of jars
# |____ plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=6.0
# elasticsearch.version=6.0
# java.version=1.8
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=An easy, safe and fast scripting language for Elasticsearch
#
# 'version': plugin's version
version=6.5.0
#
# 'name': the plugin name
name=lang-painless
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.painless.PainlessPlugin
#
# 'java.version': version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version': version of elasticsearch compiled against
elasticsearch.version=6.5.0
### optional elements for plugins:
#
# 'extended.plugins': other plugins this plugin extends through SPI
extended.plugins=
#
# 'has.native.controller': whether or not the plugin has a native controller
has.native.controller=false

View File

@ -0,0 +1,26 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
grant {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";
// needed to find the classloader to load whitelisted classes from
permission java.lang.RuntimePermission "getClassLoader";
};

View File

@ -0,0 +1,45 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' inside a plugin.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# |____ <arbitrary name1>.jar <-- classes, resources, dependencies
# |____ <arbitrary nameN>.jar <-- any number of jars
# |____ plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=6.0
# elasticsearch.version=6.0
# java.version=1.8
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=Adds advanced field mappers
#
# 'version': plugin's version
version=6.5.0
#
# 'name': the plugin name
name=mapper-extras
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.index.mapper.MapperExtrasPlugin
#
# 'java.version': version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version': version of elasticsearch compiled against
elasticsearch.version=6.5.0
### optional elements for plugins:
#
# 'extended.plugins': other plugins this plugin extends through SPI
extended.plugins=
#
# 'has.native.controller': whether or not the plugin has a native controller
has.native.controller=false

View File

@ -0,0 +1,45 @@
# Elasticsearch plugin descriptor file
# This file must exist as 'plugin-descriptor.properties' inside a plugin.
#
### example plugin for "foo"
#
# foo.zip <-- zip file for the plugin, with this structure:
# |____ <arbitrary name1>.jar <-- classes, resources, dependencies
# |____ <arbitrary nameN>.jar <-- any number of jars
# |____ plugin-descriptor.properties <-- example contents below:
#
# classname=foo.bar.BazPlugin
# description=My cool plugin
# version=6.0
# elasticsearch.version=6.0
# java.version=1.8
#
### mandatory elements for all plugins:
#
# 'description': simple summary of the plugin
description=Module for URL repository
#
# 'version': plugin's version
version=6.5.0
#
# 'name': the plugin name
name=repository-url
#
# 'classname': the name of the class to load, fully-qualified.
classname=org.elasticsearch.plugin.repository.url.URLRepositoryPlugin
#
# 'java.version': version of java the code is built against
# use the system property java.specification.version
# version string must be a sequence of nonnegative decimal integers
# separated by "."'s and may have leading zeros
java.version=1.8
#
# 'elasticsearch.version': version of elasticsearch compiled against
elasticsearch.version=6.5.0
### optional elements for plugins:
#
# 'extended.plugins': other plugins this plugin extends through SPI
extended.plugins=
#
# 'has.native.controller': whether or not the plugin has a native controller
has.native.controller=false

View File

@ -0,0 +1,22 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
grant {
permission java.net.SocketPermission "*", "connect";
};