Introduce cluster operations.

Original Pull Request: #1768 
Closes #1390
This commit is contained in:
Peter-Josef Meisch 2021-04-11 11:05:37 +02:00 committed by GitHub
parent 58bca88386
commit d561c91678
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 842 additions and 70 deletions

View File

@ -45,6 +45,8 @@ import javax.net.ssl.SSLContext;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@ -105,6 +107,7 @@ import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.elasticsearch.client.util.ScrollState;
@ -142,7 +145,7 @@ import org.springframework.web.reactive.function.client.WebClient.RequestBodySpe
* @see ClientConfiguration
* @see ReactiveRestClients
*/
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices, Cluster {
private final HostProvider<?> hostProvider;
private final RequestCreator requestCreator;
@ -297,10 +300,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
this.headersSupplier = headersSupplier;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders)
*/
@Override
public Mono<Boolean> ping(HttpHeaders headers) {
@ -309,10 +308,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#info(org.springframework.http.HttpHeaders)
*/
@Override
public Mono<MainResponse> info(HttpHeaders headers) {
@ -320,10 +315,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
*/
@Override
public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
@ -341,10 +332,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.flatMap(Flux::fromArray); //
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#exists(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
*/
@Override
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
@ -353,37 +340,26 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.index.IndexRequest)
*/
@Override
public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices()
*/
@Override
public Indices indices() {
return this;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest)
*/
@Override
public Cluster cluster() {
return this;
}
@Override
public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.delete.DeleteRequest)
*/
@Override
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
@ -391,10 +367,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.next();
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Mono<Long> count(HttpHeaders headers, SearchRequest searchRequest) {
searchRequest.source().trackTotalHits(true);
@ -412,10 +384,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.map(response -> response.getResponse().getHits()).flatMap(Flux::fromIterable);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
@ -435,10 +403,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.map(SearchResponse::getSuggest);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequest) {
@ -453,10 +417,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.flatMap(Flux::fromIterable);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest) {
@ -506,10 +466,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
return sendRequest(clearScrollRequest, requestCreator.clearScroll(), ClearScrollResponse.class, headers);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest)
*/
@Override
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
@ -524,10 +480,6 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
.map(ByQueryResponse::of);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
*/
@Override
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) //
@ -812,6 +764,14 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
// endregion
// region cluster operations
@Override
public Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest) {
return sendRequest(clusterHealthRequest, requestCreator.clusterHealth(), ClusterHealthResponse.class, headers)
.next();
}
// endregion
// region helper functions
private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
Class<T> responseType) {
@ -965,7 +925,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
} while (token == XContentParser.Token.FIELD_NAME);
return null;
} catch (IOException e) {
} catch (Exception e) {
return new ElasticsearchStatusException(content, status);
}
}

View File

@ -22,6 +22,8 @@ import java.net.ConnectException;
import java.util.Collection;
import java.util.function.Consumer;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@ -269,6 +271,14 @@ public interface ReactiveElasticsearchClient {
*/
Indices indices();
/**
* Gain Access to cluster related commands.
*
* @return Cluster implementations
* @since 4.2
*/
Cluster cluster();
/**
* Execute an {@link UpdateRequest} against the {@literal update} API to alter a document.
*
@ -1678,4 +1688,45 @@ public interface ReactiveElasticsearchClient {
*/
Mono<GetIndexResponse> getIndex(HttpHeaders headers, GetIndexRequest getIndexRequest);
}
/**
* Encapsulation of methods for accessing the Cluster API.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
interface Cluster {
/**
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
*
* @param consumer never {@literal null}.
* @return Mono emitting the {@link ClusterHealthResponse}.
*/
default Mono<ClusterHealthResponse> health(Consumer<ClusterHealthRequest> consumer) {
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest();
consumer.accept(clusterHealthRequest);
return health(clusterHealthRequest);
}
/**
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
*
* @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the
* {@link ClusterHealthResponse}.
*/
default Mono<ClusterHealthResponse> health(ClusterHealthRequest clusterHealthRequest) {
return health(HttpHeaders.EMPTY, clusterHealthRequest);
}
/**
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the
* {@link ClusterHealthResponse}.
*/
Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest);
}
}

View File

@ -3,6 +3,7 @@ package org.springframework.data.elasticsearch.client.reactive;
import java.io.IOException;
import java.util.function.Function;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@ -266,4 +267,11 @@ public interface RequestCreator {
default Function<GetIndexRequest, Request> getIndex() {
return RequestConverters::getIndex;
}
/**
* @since 4.2
*/
default Function<ClusterHealthRequest, Request> clusterHealth() {
return RequestConverters::clusterHealth;
}
}

View File

@ -107,8 +107,9 @@ import org.springframework.lang.Nullable;
/**
* <p>
* Original implementation source {@link org.elasticsearch.client.RequestConverters} and
* {@link org.elasticsearch.client.IndicesRequestConverters} by {@literal Elasticsearch}
* Original implementation source {@link org.elasticsearch.client.RequestConverters},
* {@link org.elasticsearch.client.IndicesRequestConverters} and
* {@link org.elasticsearch.client.ClusterRequestConverters} by {@literal Elasticsearch}
* (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache License, Version 2.0.
* </p>
* Modified for usage with {@link ReactiveElasticsearchClient}.
@ -1003,6 +1004,26 @@ public class RequestConverters {
return request;
}
public static Request clusterHealth(ClusterHealthRequest healthRequest) {
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
String endpoint = new EndpointBuilder().addPathPartAsIs(new String[] { "_cluster/health" })
.addCommaSeparatedPathParts(indices).build();
Request request = new Request("GET", endpoint);
RequestConverters.Params parameters = new Params(request);
parameters.withWaitForStatus(healthRequest.waitForStatus());
parameters.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards());
parameters.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards());
parameters.withWaitForActiveShards(healthRequest.waitForActiveShards(), ActiveShardCount.NONE);
parameters.withWaitForNodes(healthRequest.waitForNodes());
parameters.withWaitForEvents(healthRequest.waitForEvents());
parameters.withTimeout(healthRequest.timeout());
parameters.withMasterTimeout(healthRequest.masterNodeTimeout());
parameters.withLocal(healthRequest.local()).withLevel(healthRequest.level());
return request;
}
static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) {
try {

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core;
import java.util.Objects;
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
@ -39,25 +40,34 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera
/**
* get an {@link IndexOperations} that is bound to the given class
*
*
* @return IndexOperations
*/
IndexOperations indexOps(Class<?> clazz);
/**
* get an {@link IndexOperations} that is bound to the given index
*
*
* @return IndexOperations
*/
IndexOperations indexOps(IndexCoordinates index);
/**
* return a {@link ClusterOperations} instance that uses the same client communication setup as this
* ElasticsearchOperations instance.
*
* @return ClusterOperations implementation
* @since 4.2
*/
ClusterOperations cluster();
ElasticsearchConverter getElasticsearchConverter();
IndexCoordinates getIndexCoordinatesFor(Class<?> clazz);
/**
* gets the routing for an entity which might be defined by a join-type relation
*
*
* @param entity the entity
* @return the routing, may be null if not set.
* @since 4.1
@ -68,7 +78,7 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera
// region helper
/**
* gets the String representation for an id.
*
*
* @param id
* @return
* @since 4.0

View File

@ -44,6 +44,7 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
@ -142,6 +143,13 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
}
// endregion
// region ClusterOperations
@Override
public ClusterOperations cluster() {
return ClusterOperations.forTemplate(this);
}
// endregion
// region DocumentOperations
public String doIndex(IndexQuery query, IndexCoordinates index) {

View File

@ -41,6 +41,7 @@ import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
@ -142,6 +143,13 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
}
// endregion
// region ClusterOperations
@Override
public ClusterOperations cluster() {
return ClusterOperations.forTemplate(this);
}
// endregion
// region getter/setter
@Nullable
public String getSearchTimeout() {

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.core;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -56,6 +57,16 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
*/
<T> Publisher<T> executeWithIndicesClient(IndicesClientCallback<Publisher<T>> callback);
/**
* Execute within a {@link ClusterClientCallback} managing resources and translating errors.
*
* @param callback must not be {@literal null}.
* @param <T> the type the Publisher emits
* @return the {@link Publisher} emitting results.
* @since 4.1
*/
<T> Publisher<T> executeWithClusterClient(ClusterClientCallback<Publisher<T>> callback);
/**
* Get the {@link ElasticsearchConverter} used.
*
@ -75,6 +86,7 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
/**
* Creates a {@link ReactiveIndexOperations} that is bound to the given index
*
* @param index IndexCoordinates specifying the index
* @return ReactiveIndexOperations implementation
* @since 4.1
@ -83,13 +95,23 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
/**
* Creates a {@link ReactiveIndexOperations} that is bound to the given class
*
* @param clazz the entity clazz specifiying the index information
* @return ReactiveIndexOperations implementation
* @since 4.1
*/
ReactiveIndexOperations indexOps(Class<?> clazz);
//region routing
/**
* return a {@link ReactiveClusterOperations} instance that uses the same client communication setup as this
* ElasticsearchOperations instance.
*
* @return ClusterOperations implementation
* @since 4.2
*/
ReactiveClusterOperations cluster();
// region routing
/**
* Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to
* obtain routing information.
@ -98,7 +120,7 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
* @return DocumentOperations instance
*/
ReactiveElasticsearchOperations withRouting(RoutingResolver routingResolver);
//endregion
// endregion
/**
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on
@ -114,8 +136,8 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
}
/**
* Callback interface to be used with {@link #executeWithIndicesClient(IndicesClientCallback)} for operating directly on
* {@link ReactiveElasticsearchClient.Indices}.
* Callback interface to be used with {@link #executeWithIndicesClient(IndicesClientCallback)} for operating directly
* on {@link ReactiveElasticsearchClient.Indices}.
*
* @param <T> the return type
* @since 4.1
@ -123,4 +145,15 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati
interface IndicesClientCallback<T extends Publisher<?>> {
T doWithClient(ReactiveElasticsearchClient.Indices client);
}
/**
* Callback interface to be used with {@link #executeWithClusterClient(ClusterClientCallback)} for operating directly
* on {@link ReactiveElasticsearchClient.Cluster}.
*
* @param <T> the return type
* @since 4.2
*/
interface ClusterClientCallback<T extends Publisher<?>> {
T doWithClient(ReactiveElasticsearchClient.Cluster client);
}
}

View File

@ -57,6 +57,8 @@ import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.cluster.DefaultReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
@ -921,6 +923,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return Flux.defer(() -> callback.doWithClient(getIndicesClient())).onErrorMap(this::translateException);
}
@Override
public <T> Publisher<T> executeWithClusterClient(ClusterClientCallback<Publisher<T>> callback) {
return Flux.defer(() -> callback.doWithClient(getClusterClient())).onErrorMap(this::translateException);
}
@Override
public ElasticsearchConverter getElasticsearchConverter() {
return converter;
@ -936,6 +943,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return new DefaultReactiveIndexOperations(this, clazz);
}
@Override
public ReactiveClusterOperations cluster() {
return new DefaultReactiveClusterOperations(this);
}
@Override
public IndexCoordinates getIndexCoordinatesFor(Class<?> clazz) {
return getPersistentEntityFor(clazz).getIndexCoordinates();
@ -970,7 +982,19 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Indices implementation available");
}
// endregion
/**
* Obtain the {@link ReactiveElasticsearchClient.Cluster} to operate upon.
*
* @return never {@literal null}.
*/
protected ReactiveElasticsearchClient.Cluster getClusterClient() {
if (client instanceof ReactiveElasticsearchClient.Cluster) {
return (ReactiveElasticsearchClient.Cluster) client;
}
throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Cluster implementation available");
}
/**
* translates an Exception if possible. Exceptions that are no {@link RuntimeException}s are wrapped in a

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
@ -36,6 +37,7 @@ import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
import org.springframework.data.elasticsearch.core.cluster.ClusterHealth;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.index.AliasData;
import org.springframework.data.elasticsearch.core.index.Settings;
@ -109,7 +111,7 @@ public class ResponseConverter {
* @return a document that represents {@link Settings}
*/
private static Settings settingsFromGetIndexResponse(GetIndexResponse getIndexResponse, String indexName) {
Settings settings= new Settings();
Settings settings = new Settings();
org.elasticsearch.common.settings.Settings indexSettings = getIndexResponse.getSettings().get(indexName);
@ -289,4 +291,26 @@ public class ResponseConverter {
}
// endregion
// region cluster operations
public static ClusterHealth clusterHealth(ClusterHealthResponse clusterHealthResponse) {
return ClusterHealth.builder() //
.withActivePrimaryShards(clusterHealthResponse.getActivePrimaryShards()) //
.withActiveShards(clusterHealthResponse.getActiveShards()) //
.withActiveShardsPercent(clusterHealthResponse.getActiveShardsPercent()) //
.withClusterName(clusterHealthResponse.getClusterName()) //
.withDelayedUnassignedShards(clusterHealthResponse.getDelayedUnassignedShards()) //
.withInitializingShards(clusterHealthResponse.getInitializingShards()) //
.withNumberOfDataNodes(clusterHealthResponse.getNumberOfDataNodes()) //
.withNumberOfInFlightFetch(clusterHealthResponse.getNumberOfInFlightFetch()) //
.withNumberOfNodes(clusterHealthResponse.getNumberOfNodes()) //
.withNumberOfPendingTasks(clusterHealthResponse.getNumberOfPendingTasks()) //
.withRelocatingShards(clusterHealthResponse.getRelocatingShards()) //
.withStatus(clusterHealthResponse.getStatus().toString()) //
.withTaskMaxWaitingTimeMillis(clusterHealthResponse.getTaskMaxWaitingTime().millis()) //
.withTimedOut(clusterHealthResponse.isTimedOut()) //
.withUnassignedShards(clusterHealthResponse.getUnassignedShards()) //
.build(); //
}
// endregion
}

View File

@ -0,0 +1,248 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
/**
* Information about the cluster health. Contains currently only the top level elements returned from Elasticsearch.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
public class ClusterHealth {
private final String clusterName;
private final String status;
private final int numberOfNodes;
private final int numberOfDataNodes;
private final int activeShards;
private final int relocatingShards;
private final int activePrimaryShards;
private final int initializingShards;
private final int unassignedShards;
private final double activeShardsPercent;
private final int numberOfPendingTasks;
private final boolean timedOut;
private final int numberOfInFlightFetch;
private final int delayedUnassignedShards;
private final long taskMaxWaitingTimeMillis;
private ClusterHealth(String clusterName, String status, int numberOfNodes, int numberOfDataNodes, int activeShards,
int relocatingShards, int activePrimaryShards, int initializingShards, int unassignedShards,
double activeShardsPercent, int numberOfPendingTasks, boolean timedOut, int numberOfInFlightFetch,
int delayedUnassignedShards, long taskMaxWaitingTimeMillis) {
this.clusterName = clusterName;
this.status = status;
this.numberOfNodes = numberOfNodes;
this.numberOfDataNodes = numberOfDataNodes;
this.activeShards = activeShards;
this.relocatingShards = relocatingShards;
this.activePrimaryShards = activePrimaryShards;
this.initializingShards = initializingShards;
this.unassignedShards = unassignedShards;
this.activeShardsPercent = activeShardsPercent;
this.numberOfPendingTasks = numberOfPendingTasks;
this.timedOut = timedOut;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.delayedUnassignedShards = delayedUnassignedShards;
this.taskMaxWaitingTimeMillis = taskMaxWaitingTimeMillis;
}
public String getClusterName() {
return clusterName;
}
public String getStatus() {
return status;
}
public int getNumberOfNodes() {
return numberOfNodes;
}
public int getNumberOfDataNodes() {
return numberOfDataNodes;
}
public int getActiveShards() {
return activeShards;
}
public int getRelocatingShards() {
return relocatingShards;
}
public int getActivePrimaryShards() {
return activePrimaryShards;
}
public int getInitializingShards() {
return initializingShards;
}
public int getUnassignedShards() {
return unassignedShards;
}
public double getActiveShardsPercent() {
return activeShardsPercent;
}
public int getNumberOfPendingTasks() {
return numberOfPendingTasks;
}
public boolean isTimedOut() {
return timedOut;
}
public int getNumberOfInFlightFetch() {
return numberOfInFlightFetch;
}
public int getDelayedUnassignedShards() {
return delayedUnassignedShards;
}
public long getTaskMaxWaitingTimeMillis() {
return taskMaxWaitingTimeMillis;
}
@Override
public String toString() {
return "ClusterHealth{" +
"clusterName='" + clusterName + '\'' +
", status='" + status + '\'' +
", numberOfNodes=" + numberOfNodes +
", numberOfDataNodes=" + numberOfDataNodes +
", activeShards=" + activeShards +
", relocatingShards=" + relocatingShards +
", activePrimaryShards=" + activePrimaryShards +
", initializingShards=" + initializingShards +
", unassignedShards=" + unassignedShards +
", activeShardsPercent=" + activeShardsPercent +
", numberOfPendingTasks=" + numberOfPendingTasks +
", timedOut=" + timedOut +
", numberOfInFlightFetch=" + numberOfInFlightFetch +
", delayedUnassignedShards=" + delayedUnassignedShards +
", taskMaxWaitingTimeMillis=" + taskMaxWaitingTimeMillis +
'}';
}
public static ClusterHealthBuilder builder() {
return new ClusterHealthBuilder();
}
public static final class ClusterHealthBuilder {
private String clusterName = "";
private String status = "";
private int numberOfNodes;
private int numberOfDataNodes;
private int activeShards;
private int relocatingShards;
private int activePrimaryShards;
private int initializingShards;
private int unassignedShards;
private double activeShardsPercent;
private int numberOfPendingTasks;
private boolean timedOut;
private int numberOfInFlightFetch;
private int delayedUnassignedShards;
private long taskMaxWaitingTimeMillis;
private ClusterHealthBuilder() {}
public ClusterHealthBuilder withClusterName(String clusterName) {
this.clusterName = clusterName;
return this;
}
public ClusterHealthBuilder withStatus(String status) {
this.status = status;
return this;
}
public ClusterHealthBuilder withNumberOfNodes(int numberOfNodes) {
this.numberOfNodes = numberOfNodes;
return this;
}
public ClusterHealthBuilder withNumberOfDataNodes(int numberOfDataNodes) {
this.numberOfDataNodes = numberOfDataNodes;
return this;
}
public ClusterHealthBuilder withActiveShards(int activeShards) {
this.activeShards = activeShards;
return this;
}
public ClusterHealthBuilder withRelocatingShards(int relocatingShards) {
this.relocatingShards = relocatingShards;
return this;
}
public ClusterHealthBuilder withActivePrimaryShards(int activePrimaryShards) {
this.activePrimaryShards = activePrimaryShards;
return this;
}
public ClusterHealthBuilder withInitializingShards(int initializingShards) {
this.initializingShards = initializingShards;
return this;
}
public ClusterHealthBuilder withUnassignedShards(int unassignedShards) {
this.unassignedShards = unassignedShards;
return this;
}
public ClusterHealthBuilder withActiveShardsPercent(double activeShardsPercent) {
this.activeShardsPercent = activeShardsPercent;
return this;
}
public ClusterHealthBuilder withNumberOfPendingTasks(int numberOfPendingTasks) {
this.numberOfPendingTasks = numberOfPendingTasks;
return this;
}
public ClusterHealthBuilder withTimedOut(boolean timedOut) {
this.timedOut = timedOut;
return this;
}
public ClusterHealthBuilder withNumberOfInFlightFetch(int numberOfInFlightFetch) {
this.numberOfInFlightFetch = numberOfInFlightFetch;
return this;
}
public ClusterHealthBuilder withDelayedUnassignedShards(int delayedUnassignedShards) {
this.delayedUnassignedShards = delayedUnassignedShards;
return this;
}
public ClusterHealthBuilder withTaskMaxWaitingTimeMillis(long taskMaxWaitingTimeMillis) {
this.taskMaxWaitingTimeMillis = taskMaxWaitingTimeMillis;
return this;
}
public ClusterHealth build() {
return new ClusterHealth(clusterName, status, numberOfNodes, numberOfDataNodes, activeShards, relocatingShards,
activePrimaryShards, initializingShards, unassignedShards, activeShardsPercent, numberOfPendingTasks,
timedOut, numberOfInFlightFetch, delayedUnassignedShards, taskMaxWaitingTimeMillis);
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.util.Assert;
/**
* Elasticsearch operations on cluster level.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
public interface ClusterOperations {
/**
* Creates a ClusterOperations for a {@link ElasticsearchRestTemplate}.
*
* @param template the template, must not be {@literal null}
* @return ClusterOperations
*/
static ClusterOperations forTemplate(ElasticsearchRestTemplate template) {
Assert.notNull(template, "template must not be null");
return new DefaultClusterOperations(template);
}
/**
* Creates a ClusterOperations for a {@link ElasticsearchTemplate}.
*
* @param template the template, must not be {@literal null}
* @return ClusterOperations
*/
static ClusterOperations forTemplate(ElasticsearchTemplate template) {
Assert.notNull(template, "template must not be null");
return new DefaultTransportClusterOperations(template);
}
/**
* get the cluster's health status.
*
* @return health information for the cluster.
*/
ClusterHealth health();
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.ResponseConverter;
/**
* Default implementation of {@link ClusterOperations} using the {@link ElasticsearchRestTemplate}.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
class DefaultClusterOperations implements ClusterOperations {
private final ElasticsearchRestTemplate template;
DefaultClusterOperations(ElasticsearchRestTemplate template) {
this.template = template;
}
@Override
public ClusterHealth health() {
ClusterHealthResponse clusterHealthResponse = template
.execute(client -> client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT));
return ResponseConverter.clusterHealth(clusterHealthResponse);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import reactor.core.publisher.Mono;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ResponseConverter;
/**
* Default implementation of {@link ReactiveClusterOperations} using the {@link ReactiveElasticsearchOperations}.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
public class DefaultReactiveClusterOperations implements ReactiveClusterOperations {
private final ReactiveElasticsearchOperations operations;
public DefaultReactiveClusterOperations(ReactiveElasticsearchOperations operations) {
this.operations = operations;
}
@Override
public Mono<ClusterHealth> health() {
return Mono.from(operations.executeWithClusterClient(
client -> client.health(new ClusterHealthRequest()).map(ResponseConverter::clusterHealth)));
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.ResponseConverter;
/**
* Default implementation of {@link ClusterOperations} using the
* {@link org.elasticsearch.client.transport.TransportClient}.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
public class DefaultTransportClusterOperations implements ClusterOperations {
private final ElasticsearchTemplate template;
public DefaultTransportClusterOperations(ElasticsearchTemplate template) {
this.template = template;
}
@Override
public ClusterHealth health() {
ClusterHealthResponse clusterHealthResponse = template.getClient().admin().cluster()
.health(new ClusterHealthRequest()).actionGet();
return ResponseConverter.clusterHealth(clusterHealthResponse);
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import reactor.core.publisher.Mono;
/**
* Reactive Elasticsearch operations on cluster level.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
public interface ReactiveClusterOperations {
/**
* get the cluster's health status.
*
* @return a Mono emitting the health information for the cluster.
*/
Mono<ClusterHealth> health();
}

View File

@ -0,0 +1,6 @@
/**
* Interfaces and classes related to Elasticsearch cluster information and management.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.data.elasticsearch.core.cluster;

View File

@ -0,0 +1,56 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import static org.assertj.core.api.Assertions.*;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { ElasticsearchRestTemplateConfiguration.class })
public class ClusterOperationsIntegrationTests {
@Autowired private ElasticsearchOperations operations;
private ClusterOperations clusterOperations;
@BeforeEach
void setUp() {
clusterOperations = operations.cluster();
}
@Test // #1390
@DisplayName("should return cluster health information")
void shouldReturnClusterHealthInformation() {
ClusterHealth clusterHealth = clusterOperations.health();
List<String> allowedStates = Arrays.asList("GREEN", "YELLOW");
assertThat(allowedStates).contains(clusterHealth.getStatus());
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
*/
@ContextConfiguration(classes = { ElasticsearchTemplateConfiguration.class })
public class ClusterOperationsTransportIntegrationTests extends ClusterOperationsIntegrationTests {}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.cluster;
import static org.assertj.core.api.Assertions.*;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { ReactiveElasticsearchRestTemplateConfiguration.class })
public class ReactiveClusterOperationsIntegrationTests {
@Autowired private ReactiveElasticsearchOperations operations;
private ReactiveClusterOperations clusterOperations;
@BeforeEach
void setUp() {
clusterOperations = operations.cluster();
}
@Test // #1390
@DisplayName("should return cluster health information")
void shouldReturnClusterHealthInformation() {
List<String> allowedStates = Arrays.asList("GREEN", "YELLOW");
clusterOperations.health() //
.as(StepVerifier::create) //
.consumeNextWith(clusterHealth -> { //
assertThat(allowedStates).contains(clusterHealth.getStatus()); //
}) //
.verifyComplete();
}
}