Add more implementations using the new client.

Original Pull Request #2136
See #1973
This commit is contained in:
Peter-Josef Meisch 2022-04-13 22:12:02 +02:00 committed by GitHub
parent ea4d3f9f30
commit 8cef50347e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 1765 additions and 516 deletions

View File

@ -25,6 +25,14 @@ public class NoSuchIndexException extends NonTransientDataAccessResourceExceptio
private final String index;
/**
* @since 4.4
*/
public NoSuchIndexException(String index) {
super(String.format("Index %s not found.", index));
this.index = index;
}
public NoSuchIndexException(String index, Throwable cause) {
super(String.format("Index %s not found.", index), cause);
this.index = index;

View File

@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
/**
* Class to combine an Elasticsearch {@link co.elastic.clients.elasticsearch._types.aggregations.Aggregate} with its
* name. Necessary as the Elasticsearch Aggregate does not know i"s name.
*
* @author Peter-Josef Meisch
* @since 4.4
*/
public class Aggregation {
private final String name;
private final Aggregate aggregate;
public Aggregation(String name, Aggregate aggregate) {
this.name = name;
this.aggregate = aggregate;
}
public String getName() {
return name;
}
public Aggregate getAggregate() {
return aggregate;
}
}

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
@ -77,18 +78,12 @@ final class DocumentAdapters {
NestedMetaData nestedMetaData = from(hit.nested());
// todo #1973 explanation
Explanation explanation = from(hit.explanation());
// todo #1973 matchedQueries
List<String> matchedQueries = null;
// todo #1973 documentFields
Map<String, List<Object>> documentFields = Collections.emptyMap();
Document document;
Object source = hit.source();
if (source == null) {
// Elasticsearch provides raw JsonData, so we build the fields into a JSON string
Function<Map<String, JsonData>, EntityAsMap> fromFields = fields -> {
StringBuilder sb = new StringBuilder("{");
final boolean[] firstField = { true };
hit.fields().forEach((key, jsonData) -> {
@ -100,7 +95,25 @@ final class DocumentAdapters {
firstField[0] = false;
});
sb.append('}');
document = Document.parse(sb.toString());
return new EntityAsMap().fromJson(sb.toString());
};
EntityAsMap hitFieldsAsMap = fromFields.apply(hit.fields());
Map<String, List<Object>> documentFields = new LinkedHashMap<>();
hitFieldsAsMap.entrySet().forEach(entry -> {
if (entry.getValue() instanceof List) {
// noinspection unchecked
documentFields.put(entry.getKey(), (List<Object>) entry.getValue());
} else {
documentFields.put(entry.getKey(), Collections.singletonList(entry.getValue()));
}
});
Document document;
Object source = hit.source();
if (source == null) {
document = Document.from(hitFieldsAsMap);
} else {
if (source instanceof EntityAsMap) {
document = Document.from((EntityAsMap) source);

View File

@ -0,0 +1,37 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.elc;
import org.springframework.data.elasticsearch.core.AggregationContainer;
/**
* {@link AggregationContainer} for a {@link Aggregation} that holds Elasticsearch data.
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ElasticsearchAggregation implements AggregationContainer<Aggregation> {
private final Aggregation aggregation;
public ElasticsearchAggregation(Aggregation aggregation) {
this.aggregation = aggregation;
}
@Override
public Aggregation aggregation() {
return aggregation;
}
}

View File

@ -17,9 +17,12 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.springframework.data.elasticsearch.core.AggregationsContainer;
import org.springframework.util.Assert;
/**
* AggregationsContainer implementation for the Elasticsearch aggregations.
@ -27,16 +30,33 @@ import org.springframework.data.elasticsearch.core.AggregationsContainer;
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ElasticsearchAggregations implements AggregationsContainer<Map<String, Aggregate>> {
public class ElasticsearchAggregations implements AggregationsContainer<List<ElasticsearchAggregation>> {
private final Map<String, Aggregate> aggregations;
private final List<ElasticsearchAggregation> aggregations;
public ElasticsearchAggregations(List<ElasticsearchAggregation> aggregations) {
Assert.notNull(aggregations, "aggregations must not be null");
public ElasticsearchAggregations(Map<String, Aggregate> aggregations) {
this.aggregations = aggregations;
}
/**
* convenience constructor taking a map as it is returned from the new Elasticsearch client.
*
* @param aggregationsMap aggregate map
*/
public ElasticsearchAggregations(Map<String, Aggregate> aggregationsMap) {
Assert.notNull(aggregationsMap, "aggregationsMap must not be null");
aggregations = new ArrayList<>(aggregationsMap.size());
aggregationsMap
.forEach((name, aggregate) -> aggregations.add(new ElasticsearchAggregation(new Aggregation(name, aggregate))));
}
@Override
public Map<String, Aggregate> aggregations() {
public List<ElasticsearchAggregation> aggregations() {
return aggregations;
}
}

View File

@ -20,13 +20,18 @@ import co.elastic.clients.elasticsearch._types.ErrorResponse;
import co.elastic.clients.json.JsonpMapper;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.elasticsearch.client.ResponseException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.elasticsearch.RestStatusException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.http.HttpStatus;
/**
* Simple {@link PersistenceExceptionTranslator} for Elasticsearch. Convert the given runtime exception to an
@ -67,14 +72,28 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra
return new OptimisticLockingFailureException("Cannot index a document due to seq_no+primary_term conflict", ex);
}
// todo #1973 index unavailable?
if (ex instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) ex;
ErrorResponse response = elasticsearchException.response();
if (response.status() == HttpStatus.NOT_FOUND.value()
&& "index_not_found_exception".equals(response.error().type())) {
Pattern pattern = Pattern.compile(".*no such index \\[(.*)\\]");
String index = "";
Matcher matcher = pattern.matcher(response.error().reason());
if (matcher.matches()) {
index = matcher.group(1);
}
return new NoSuchIndexException(index);
}
String body = JsonUtils.toJson(response, jsonpMapper);
if (response.error().type().contains("validation_exception")) {
return new DataIntegrityViolationException(response.error().reason());
}
return new UncategorizedElasticsearchException(ex.getMessage(), response.status(), body, ex);
}
@ -86,20 +105,22 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra
return null;
}
private boolean isSeqNoConflict(Exception exception) {
private boolean isSeqNoConflict(Throwable exception) {
// todo #1973 check if this works
Integer status = null;
String message = null;
if (exception instanceof RestStatusException) {
RestStatusException statusException = (RestStatusException) exception;
status = statusException.getStatus();
message = statusException.getMessage();
if (exception instanceof ResponseException) {
ResponseException responseException = (ResponseException) exception;
status = responseException.getResponse().getStatusLine().getStatusCode();
message = responseException.getMessage();
} else if (exception.getCause() != null) {
return isSeqNoConflict(exception.getCause());
}
if (status != null && message != null) {
return status == 409 && message.contains("type=version_conflict_engine_exception")
return status == 409 && message.contains("type\":\"version_conflict_engine_exception")
&& message.contains("version conflict, required seqNo");
}

View File

@ -15,10 +15,13 @@
*/
package org.springframework.data.elasticsearch.client.elc;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.msearch.MultiSearchResponseItem;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
@ -40,6 +43,7 @@ import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
@ -137,7 +141,12 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
@Override
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(queries, "queries must not be null");
Assert.notNull(bulkOptions, "bulkOptions must not be null");
Assert.notNull(index, "index must not be null");
doBulkOperation(queries, bulkOptions, index);
}
@Override
@ -155,12 +164,25 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
@Override
public UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
routingResolver.getRouting());
co.elastic.clients.elasticsearch.core.UpdateResponse<Document> response = execute(
client -> client.update(request, Document.class));
return UpdateResponse.of(result(response.result()));
}
@Override
public ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(updateQuery, "updateQuery must not be null");
Assert.notNull(index, "index must not be null");
UpdateByQueryRequest request = requestConverter.documentUpdateByQueryRequest(updateQuery, index,
getRefreshPolicy());
UpdateByQueryResponse byQueryResponse = execute(client -> client.updateByQuery(request));
return responseConverter.byQueryResponse(byQueryResponse);
}
@Override
@ -404,14 +426,52 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
return doMultiSearch(multiSearchQueryParameters);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private List<SearchHits<?>> doMultiSearch(List<MultiSearchQueryParameter> multiSearchQueryParameters) {
throw new UnsupportedOperationException("not implemented");
MsearchRequest request = requestConverter.searchMsearchRequest(multiSearchQueryParameters);
MsearchResponse<EntityAsMap> msearchResponse = execute(client -> client.msearch(request, EntityAsMap.class));
List<MultiSearchResponseItem<EntityAsMap>> responseItems = msearchResponse.responses();
Assert.isTrue(multiSearchQueryParameters.size() == responseItems.size(),
"number of response items does not match number of requests");
List<SearchHits<?>> searchHitsList = new ArrayList<>(multiSearchQueryParameters.size());
Iterator<MultiSearchQueryParameter> queryIterator = multiSearchQueryParameters.iterator();
Iterator<MultiSearchResponseItem<EntityAsMap>> responseIterator = responseItems.iterator();
while (queryIterator.hasNext()) {
MultiSearchQueryParameter queryParameter = queryIterator.next();
MultiSearchResponseItem<EntityAsMap> responseItem = responseIterator.next();
// if responseItem kind is Result then responsItem.value is a MultiSearchItem which is derived from SearchResponse
if (responseItem.isResult()) {
Class clazz = queryParameter.clazz;
ReadDocumentCallback<?> documentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz,
queryParameter.index);
SearchDocumentResponseCallback<SearchHits<?>> callback = new ReadSearchDocumentResponseCallback<>(clazz,
queryParameter.index);
SearchHits<?> searchHits = callback.doWith(
SearchDocumentResponseBuilder.from(responseItem.result(), getEntityCreator(documentCallback), jsonpMapper));
searchHitsList.add(searchHits);
} else {
// todo #1973 add failure
}
}
return searchHitsList;
}
/**
* value class combining the information needed for a single query in a multisearch request.
*/
private static class MultiSearchQueryParameter {
static class MultiSearchQueryParameter {
final Query query;
final Class<?> clazz;
final IndexCoordinates index;

View File

@ -17,14 +17,18 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import co.elastic.clients.elasticsearch.core.search.Suggester;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.data.elasticsearch.core.query.BaseQuery;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptedField;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* A {@link org.springframework.data.elasticsearch.core.query.Query} implementation using query builders from the new
@ -36,13 +40,23 @@ import org.springframework.util.Assert;
public class NativeQuery extends BaseQuery {
@Nullable private final Query query;
@Nullable private Query filter;
// note: the new client does not have pipeline aggs, these are just set up as normal aggs
private final Map<String, Aggregation> aggregations = new LinkedHashMap<>();
@Nullable private Suggester suggester;
@Nullable private FieldCollapse fieldCollapse;
private List<ScriptedField> scriptedFields = Collections.emptyList();
private List<RescorerQuery> rescorerQueries = Collections.emptyList();
public NativeQuery(NativeQueryBuilder builder) {
super(builder);
this.query = builder.getQuery();
this.filter = builder.getFilter();
this.aggregations.putAll(builder.getAggregations());
this.suggester = builder.getSuggester();
this.fieldCollapse = builder.getFieldCollapse();
this.scriptedFields = builder.getScriptedFields();
this.rescorerQueries = builder.getRescorerQueries();
}
public NativeQuery(@Nullable Query query) {
@ -58,20 +72,9 @@ public class NativeQuery extends BaseQuery {
return query;
}
public void addAggregation(String name, Aggregation aggregation) {
Assert.notNull(name, "name must not be null");
Assert.notNull(aggregation, "aggregation must not be null");
aggregations.put(name, aggregation);
}
public void setAggregations(Map<String, Aggregation> aggregations) {
Assert.notNull(aggregations, "aggregations must not be null");
this.aggregations.clear();
this.aggregations.putAll(aggregations);
@Nullable
public Query getFilter() {
return filter;
}
public Map<String, Aggregation> getAggregations() {
@ -83,8 +86,17 @@ public class NativeQuery extends BaseQuery {
return suggester;
}
public void setSuggester(@Nullable Suggester suggester) {
this.suggester = suggester;
@Nullable
public FieldCollapse getFieldCollapse() {
return fieldCollapse;
}
public List<ScriptedField> getScriptedFields() {
return scriptedFields;
}
@Override
public List<RescorerQuery> getRescorerQueries() {
return rescorerQueries;
}
}

View File

@ -17,14 +17,19 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import co.elastic.clients.elasticsearch.core.search.Suggester;
import co.elastic.clients.util.ObjectBuilder;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptedField;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -35,17 +40,47 @@ import org.springframework.util.Assert;
public class NativeQueryBuilder extends BaseQueryBuilder<NativeQuery, NativeQueryBuilder> {
@Nullable private Query query;
@Nullable private Query filter;
private final Map<String, Aggregation> aggregations = new LinkedHashMap<>();
@Nullable private Suggester suggester;
@Nullable private FieldCollapse fieldCollapse;
private final List<ScriptedField> scriptedFields = new ArrayList<>();
private List<RescorerQuery> rescorerQueries = new ArrayList<>();
public NativeQueryBuilder() {
}
public NativeQueryBuilder() {}
@Nullable
public Query getQuery() {
return query;
}
@Nullable
public Query getFilter() {
return this.filter;
}
public Map<String, Aggregation> getAggregations() {
return aggregations;
}
@Nullable
public Suggester getSuggester() {
return suggester;
}
@Nullable
public FieldCollapse getFieldCollapse() {
return fieldCollapse;
}
public List<ScriptedField> getScriptedFields() {
return scriptedFields;
}
public List<RescorerQuery> getRescorerQueries() {
return rescorerQueries;
}
public NativeQueryBuilder withQuery(Query query) {
Assert.notNull(query, "query must not be null");
@ -54,6 +89,11 @@ public class NativeQueryBuilder extends BaseQueryBuilder<NativeQuery, NativeQuer
return this;
}
public NativeQueryBuilder withFilter(@Nullable Query filter) {
this.filter = filter;
return this;
}
public NativeQueryBuilder withQuery(Function<Query.Builder, ObjectBuilder<Query>> fn) {
Assert.notNull(fn, "fn must not be null");
@ -75,11 +115,28 @@ public class NativeQueryBuilder extends BaseQueryBuilder<NativeQuery, NativeQuer
return this;
}
public NativeQuery build() {
NativeQuery nativeQuery = new NativeQuery(this);
nativeQuery.setAggregations(aggregations);
nativeQuery.setSuggester(suggester);
public NativeQueryBuilder withFieldCollapse(@Nullable FieldCollapse fieldCollapse) {
this.fieldCollapse = fieldCollapse;
return this;
}
return nativeQuery;
public NativeQueryBuilder withScriptedField(ScriptedField scriptedField) {
Assert.notNull(scriptedField, "scriptedField must not be null");
this.scriptedFields.add(scriptedField);
return this;
}
public NativeQueryBuilder withResorerQuery(RescorerQuery resorerQuery) {
Assert.notNull(resorerQuery, "resorerQuery must not be null");
this.rescorerQueries.add(resorerQuery);
return this;
}
public NativeQuery build() {
return new NativeQuery(this);
}
}

View File

@ -127,12 +127,51 @@ public class ReactiveElasticsearchClient extends ApiClient<ElasticsearchTranspor
return Mono.fromFuture(transport.performRequestAsync(request, endpoint, transportOptions));
}
public <T, P> Mono<UpdateResponse<T>> update(UpdateRequest<T, P> request, Class<T> clazz) {
Assert.notNull(request, "request must not be null");
// noinspection unchecked
JsonEndpoint<UpdateRequest<?, ?>, UpdateResponse<T>, ErrorResponse> endpoint = new EndpointWithResponseMapperAttr(
UpdateRequest._ENDPOINT, "co.elastic.clients:Deserializer:_global.update.TDocument",
this.getDeserializer(clazz));
return Mono.fromFuture(transport.performRequestAsync(request, endpoint, this.transportOptions));
}
public <T, P> Mono<UpdateResponse<T>> update(
Function<UpdateRequest.Builder<T, P>, ObjectBuilder<UpdateRequest<T, P>>> fn, Class<T> clazz) {
Assert.notNull(fn, "fn must not be null");
return update(fn.apply(new UpdateRequest.Builder<>()).build(), clazz);
}
public <T> Mono<GetResponse<T>> get(Function<GetRequest.Builder, ObjectBuilder<GetRequest>> fn, Class<T> tClass) {
Assert.notNull(fn, "fn must not be null");
return get(fn.apply(new GetRequest.Builder()).build(), tClass);
}
public <T> Mono<MgetResponse<T>> mget(MgetRequest request, Class<T> clazz) {
Assert.notNull(request, "request must not be null");
Assert.notNull(clazz, "clazz must not be null");
// noinspection unchecked
JsonEndpoint<MgetRequest, MgetResponse<T>, ErrorResponse> endpoint = (JsonEndpoint<MgetRequest, MgetResponse<T>, ErrorResponse>) MgetRequest._ENDPOINT;
endpoint = new EndpointWithResponseMapperAttr<>(endpoint, "co.elastic.clients:Deserializer:_global.mget.TDocument",
this.getDeserializer(clazz));
return Mono.fromFuture(transport.performRequestAsync(request, endpoint, transportOptions));
}
public <T> Mono<MgetResponse<T>> mget(Function<MgetRequest.Builder, ObjectBuilder<MgetRequest>> fn, Class<T> clazz) {
Assert.notNull(fn, "fn must not be null");
return mget(fn.apply(new MgetRequest.Builder()).build(), clazz);
}
public Mono<ReindexResponse> reindex(ReindexRequest request) {
Assert.notNull(request, "request must not be null");
@ -161,6 +200,21 @@ public class ReactiveElasticsearchClient extends ApiClient<ElasticsearchTranspor
return delete(fn.apply(new DeleteRequest.Builder()).build());
}
public Mono<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest request) {
Assert.notNull(request, "request must not be null");
return Mono.fromFuture(transport.performRequestAsync(request, DeleteByQueryRequest._ENDPOINT, transportOptions));
}
public Mono<DeleteByQueryResponse> deleteByQuery(
Function<DeleteByQueryRequest.Builder, ObjectBuilder<DeleteByQueryRequest>> fn) {
Assert.notNull(fn, "fn must not be null");
return deleteByQuery(fn.apply(new DeleteByQueryRequest.Builder()).build());
}
// endregion
// region search

View File

@ -212,14 +212,6 @@ public class ReactiveElasticsearchIndicesClient
return existsTemplate(fn.apply(new ExistsTemplateRequest.Builder()).build());
}
public Mono<BooleanResponse> existsType(ExistsTypeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, ExistsTypeRequest._ENDPOINT, transportOptions));
}
public Mono<BooleanResponse> existsType(Function<ExistsTypeRequest.Builder, ObjectBuilder<ExistsTypeRequest>> fn) {
return existsType(fn.apply(new ExistsTypeRequest.Builder()).build());
}
public Mono<FlushResponse> flush(FlushRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, FlushRequest._ENDPOINT, transportOptions));
}
@ -232,19 +224,6 @@ public class ReactiveElasticsearchIndicesClient
return flush(builder -> builder);
}
public Mono<FlushSyncedResponse> flushSynced(FlushSyncedRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, FlushSyncedRequest._ENDPOINT, transportOptions));
}
public Mono<FlushSyncedResponse> flushSynced(
Function<FlushSyncedRequest.Builder, ObjectBuilder<FlushSyncedRequest>> fn) {
return flushSynced(fn.apply(new FlushSyncedRequest.Builder()).build());
}
public Mono<FlushSyncedResponse> flushSynced() {
return flushSynced(builder -> builder);
}
@SuppressWarnings("SpellCheckingInspection")
public Mono<ForcemergeResponse> forcemerge(ForcemergeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, ForcemergeRequest._ENDPOINT, transportOptions));
@ -260,14 +239,6 @@ public class ReactiveElasticsearchIndicesClient
return forcemerge(builder -> builder);
}
public Mono<FreezeResponse> freeze(FreezeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, FreezeRequest._ENDPOINT, transportOptions));
}
public Mono<FreezeResponse> freeze(Function<FreezeRequest.Builder, ObjectBuilder<FreezeRequest>> fn) {
return freeze(fn.apply(new FreezeRequest.Builder()).build());
}
public Mono<GetIndexResponse> get(GetIndexRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, GetIndexRequest._ENDPOINT, transportOptions));
}
@ -363,18 +334,6 @@ public class ReactiveElasticsearchIndicesClient
return getTemplate(builder -> builder);
}
public Mono<GetUpgradeResponse> getUpgrade(GetUpgradeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, GetUpgradeRequest._ENDPOINT, transportOptions));
}
public Mono<GetUpgradeResponse> getUpgrade(Function<GetUpgradeRequest.Builder, ObjectBuilder<GetUpgradeRequest>> fn) {
return getUpgrade(fn.apply(new GetUpgradeRequest.Builder()).build());
}
public Mono<GetUpgradeResponse> getUpgrade() {
return getUpgrade(builder -> builder);
}
public Mono<MigrateToDataStreamResponse> migrateToDataStream(MigrateToDataStreamRequest request) {
return Mono
.fromFuture(transport.performRequestAsync(request, MigrateToDataStreamRequest._ENDPOINT, transportOptions));
@ -601,18 +560,6 @@ public class ReactiveElasticsearchIndicesClient
return updateAliases(builder -> builder);
}
public Mono<UpgradeResponse> upgrade(UpgradeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, UpgradeRequest._ENDPOINT, transportOptions));
}
public Mono<UpgradeResponse> upgrade(Function<UpgradeRequest.Builder, ObjectBuilder<UpgradeRequest>> fn) {
return upgrade(fn.apply(new UpgradeRequest.Builder()).build());
}
public Mono<UpgradeResponse> upgrade() {
return upgrade(builder -> builder);
}
public Mono<ValidateQueryResponse> validateQuery(ValidateQueryRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, ValidateQueryRequest._ENDPOINT, transportOptions));
}

View File

@ -15,10 +15,13 @@
*/
package org.springframework.data.elasticsearch.client.elc;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.get.GetResult;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
import reactor.core.publisher.Flux;
@ -46,6 +49,7 @@ import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperatio
import org.springframework.data.elasticsearch.core.ReactiveIndexOperations;
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -103,6 +107,34 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
)));
}
@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPublisher, IndexCoordinates index) {
Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!");
return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)//
.index() //
.flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkResponseItem response = indexAndResponse.getT2();
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.id(), response.seqNo(),
response.primaryTerm(), response.version()));
return maybeCallAfterSave(savedEntity, index);
});
});
}
@Override
public <T> Mono<T> get(String id, Class<T> entityType, IndexCoordinates index) {
@ -144,9 +176,9 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
return Mono.from(execute( //
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.ReindexResponse>>) client -> client
.reindex(reindexRequestES)))
.flatMap(response -> (response.task() == null)
? Mono.error( // todo #1973 check behaviour and create issue in ES if necessary
new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request"))
.flatMap(response -> (response.task() == null) ? Mono.error( // todo #1973 check behaviour and create issue in
// ES if necessary
new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request"))
: Mono.just(response.task()));
}
@ -170,7 +202,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
}
private <R> Mono<BulkResponse> checkForBulkOperationFailure(BulkResponse bulkResponse) {
private Mono<BulkResponse> checkForBulkOperationFailure(BulkResponse bulkResponse) {
if (bulkResponse.errors()) {
Map<String, String> failedDocuments = new HashMap<>();
@ -214,6 +246,31 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
}).onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
@Override
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(clazz, "clazz must not be null");
MgetRequest request = requestConverter.documentMgetRequest(query, clazz, index);
ReadDocumentCallback<T> callback = new ReadDocumentCallback<>(converter, clazz, index);
Publisher<MgetResponse<EntityAsMap>> response = execute(
(ClientCallback<Publisher<MgetResponse<EntityAsMap>>>) client -> client.mget(request, EntityAsMap.class));
return Mono.from(response)//
.flatMapMany(it -> Flux.fromIterable(DocumentAdapters.from(it))) //
.flatMap(multiGetItem -> {
if (multiGetItem.isFailed()) {
return Mono.just(MultiGetItem.of(null, multiGetItem.getFailure()));
} else {
return callback.toEntity(multiGetItem.getItem()) //
.map(t -> MultiGetItem.of(t, multiGetItem.getFailure()));
}
});
}
// endregion
@Override
@ -223,8 +280,30 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
@Override
protected Mono<Boolean> doExists(String id, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");
GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true);
return Mono.from(execute(
((ClientCallback<Publisher<GetResponse<EntityAsMap>>>) client -> client.get(getRequest, EntityAsMap.class))))
.map(GetResult::found) //
.onErrorReturn(NoSuchIndexException.class, false);
}
@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index,
getRefreshPolicy());
return Mono
.from(execute((ClientCallback<Publisher<DeleteByQueryResponse>>) client -> client.deleteByQuery(request)))
.map(responseConverter::byQueryResponse);
}
// region search operations
@Override
@ -307,7 +386,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
// noinspection unchecked
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<T>((Class<T>) clazz, index);
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>((Class<T>) clazz, index);
SearchDocumentResponse.EntityCreator<T> entityCreator = searchDocument -> callback.toEntity(searchDocument)
.toFuture();
@ -317,6 +396,15 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
.map(searchResponse -> SearchDocumentResponseBuilder.from(searchResponse, entityCreator, jsonpMapper));
}
@Override
public Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return doFindForResponse(query, entityType, index).flatMapMany(searchDocumentResponse -> {
ElasticsearchAggregations aggregations = (ElasticsearchAggregations) searchDocumentResponse.getAggregations();
return aggregations == null ? Flux.empty() : Flux.fromIterable(aggregations.aggregations());
});
}
// endregion
@Override
@ -326,7 +414,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
@Override
protected Mono<String> getRuntimeLibraryVersion() {
return Mono.just(Version.VERSION.toString());
return Mono.just(Version.VERSION != null ? Version.VERSION.toString() : "null");
}
@Override
@ -334,47 +422,22 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
return Mono.from(execute(ReactiveElasticsearchClient::info)).map(infoResponse -> infoResponse.version().number());
}
@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPublisher, IndexCoordinates index) {
Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!");
return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)//
.index() //
.flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkResponseItem response = indexAndResponse.getT2();
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.id(), response.seqNo(),
response.primaryTerm(), response.version()));
return maybeCallAfterSave(savedEntity, index);
});
});
}
@Override
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(updateQuery, "UpdateQuery must not be null");
Assert.notNull(index, "Index must not be null");
UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
routingResolver.getRouting());
return Mono.from(execute(
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.UpdateResponse<Document>>>) client -> client
.update(request, Document.class)))
.flatMap(response -> {
UpdateResponse.Result result = result(response.result());
return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result));
});
}
@Override
@ -413,11 +476,6 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
return new ReactiveClusterTemplate(client.cluster(), converter);
}
@Override
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public Flux<Suggest> suggest(SuggestBuilder suggestion, Class<?> entityType) {
throw new UnsupportedOperationException("not implemented");

View File

@ -297,7 +297,7 @@ public class ReactiveIndicesTemplate extends ReactiveChildTemplate<ReactiveElast
co.elastic.clients.elasticsearch.indices.ExistsTemplateRequest existsTemplateRequestES = requestConverter
.indicesExistsTemplateRequest(existsTemplateRequest);
return Mono.from(execute(client1 -> client1.existsTemplate(existsTemplateRequestES))).map(BooleanResponse::value);
return Mono.from(execute(client -> client.existsTemplate(existsTemplateRequestES))).map(BooleanResponse::value);
}
@Override
@ -307,13 +307,18 @@ public class ReactiveIndicesTemplate extends ReactiveChildTemplate<ReactiveElast
co.elastic.clients.elasticsearch.indices.DeleteTemplateRequest deleteTemplateRequestES = requestConverter
.indicesDeleteTemplateRequest(deleteTemplateRequest);
return Mono.from(execute(client1 -> client1.deleteTemplate(deleteTemplateRequestES)))
return Mono.from(execute(client -> client.deleteTemplate(deleteTemplateRequestES)))
.map(DeleteTemplateResponse::acknowledged);
}
@Override
public Flux<IndexInformation> getInformation(IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
GetIndexRequest request = requestConverter.indicesGetIndexRequest(index);
return Mono.from(execute(client -> client.get(request))) //
.map(responseConverter::indicesGetIndexInformations) //
.flatMapMany(Flux::fromIterable);
}
@Override

View File

@ -16,14 +16,15 @@
package org.springframework.data.elasticsearch.client.elc;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
import static org.springframework.util.ObjectUtils.*;
import static org.springframework.util.CollectionUtils.*;
import co.elastic.clients.elasticsearch._types.Conflicts;
import co.elastic.clients.elasticsearch._types.InlineScript;
import co.elastic.clients.elasticsearch._types.OpType;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.VersionType;
import co.elastic.clients.elasticsearch._types.WaitForActiveShardOptions;
import co.elastic.clients.elasticsearch._types.mapping.FieldType;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.RuntimeField;
@ -31,20 +32,17 @@ import co.elastic.clients.elasticsearch._types.mapping.RuntimeFieldType;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch._types.query_dsl.Like;
import co.elastic.clients.elasticsearch.cluster.HealthRequest;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
import co.elastic.clients.elasticsearch.core.DeleteRequest;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.MgetRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation;
import co.elastic.clients.elasticsearch.core.mget.MultiGetOperation;
import co.elastic.clients.elasticsearch.core.search.Highlight;
import co.elastic.clients.elasticsearch.core.search.Rescore;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.elasticsearch.indices.update_aliases.Action;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpDeserializer;
@ -58,6 +56,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -67,6 +66,7 @@ import java.util.stream.Collectors;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.elasticsearch.core.ScriptType;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.index.AliasAction;
@ -562,6 +562,79 @@ class RequestConverter {
return builder.build();
}
private UpdateOperation<?, ?> bulkUpdateOperation(UpdateQuery query, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy) {
UpdateOperation.Builder<Object, Object> uob = new UpdateOperation.Builder<>();
String indexName = query.getIndexName() != null ? query.getIndexName() : index.getIndexName();
uob.index(indexName).id(query.getId());
uob.action(a -> {
a //
.script(getScript(query.getScriptData())) //
.doc(query.getDocument()) //
.upsert(query.getUpsert()) //
.scriptedUpsert(query.getScriptedUpsert()) //
.docAsUpsert(query.getDocAsUpsert()) //
;
if (query.getFetchSource() != null) {
a.source(sc -> sc.fetch(query.getFetchSource()));
}
if (query.getFetchSourceIncludes() != null || query.getFetchSourceExcludes() != null) {
List<String> includes = query.getFetchSourceIncludes() != null ? query.getFetchSourceIncludes()
: Collections.emptyList();
List<String> excludes = query.getFetchSourceExcludes() != null ? query.getFetchSourceExcludes()
: Collections.emptyList();
a.source(sc -> sc.filter(sf -> sf.includes(includes).excludes(excludes)));
}
return a;
});
uob //
.routing(query.getRouting()) //
.ifSeqNo(query.getIfSeqNo() != null ? Long.valueOf(query.getIfSeqNo()) : null) //
.ifPrimaryTerm(query.getIfPrimaryTerm() != null ? Long.valueOf(query.getIfPrimaryTerm()) : null) //
.retryOnConflict(query.getRetryOnConflict()) //
;
// no refresh, timeout, waitForActiveShards on UpdateOperation or UpdateAction
return uob.build();
}
@Nullable
private co.elastic.clients.elasticsearch._types.Script getScript(@Nullable ScriptData scriptData) {
if (scriptData == null) {
return null;
}
Map<String, JsonData> params = new HashMap<>();
if (scriptData.getParams() != null) {
scriptData.getParams().forEach((key, value) -> {
params.put(key, JsonData.of(value, jsonpMapper));
});
}
return co.elastic.clients.elasticsearch._types.Script.of(sb -> {
if (scriptData.getType() == ScriptType.INLINE) {
sb.inline(is -> is //
.lang(scriptData.getLanguage()) //
.source(scriptData.getScript()) //
.params(params)); //
} else if (scriptData.getType() == ScriptType.STORED) {
sb.stored(ss -> ss //
.id(scriptData.getScript()) //
.params(params) //
);
}
return sb;
});
}
public BulkRequest documentBulkRequest(List<?> queries, BulkOptions bulkOptions, IndexCoordinates indexCoordinates,
@Nullable RefreshPolicy refreshPolicy) {
@ -599,7 +672,8 @@ class RequestConverter {
ob.index(bulkIndexOperation(indexQuery, indexCoordinates, refreshPolicy));
}
} else if (query instanceof UpdateQuery) {
// todo #1973
UpdateQuery updateQuery = (UpdateQuery) query;
ob.update(bulkUpdateOperation(updateQuery, indexCoordinates, refreshPolicy));
}
return ob.build();
}).collect(Collectors.toList());
@ -674,7 +748,7 @@ class RequestConverter {
}
if (source.getQuery() != null) {
s.query(getQuery(source.getQuery()));
s.query(getQuery(source.getQuery(), null));
}
if (source.getRemote() != null) {
@ -731,13 +805,8 @@ class RequestConverter {
builder.script(s -> s.inline(InlineScript.of(i -> i.lang(script.getLang()).source(script.getSource()))));
}
if (reindexRequest.getTimeout() != null) {
builder.timeout(tv -> tv.time(reindexRequest.getTimeout().toMillis() + "ms"));
}
if (reindexRequest.getScroll() != null) {
builder.scroll(tv -> tv.time(reindexRequest.getScroll().toMillis() + "ms"));
}
builder.timeout(time(reindexRequest.getTimeout())) //
.scroll(time(reindexRequest.getScroll()));
if (reindexRequest.getWaitForActiveShards() != null) {
builder.waitForActiveShards(wfas -> wfas //
@ -779,7 +848,7 @@ class RequestConverter {
return DeleteByQueryRequest.of(b -> {
b.index(Arrays.asList(index.getIndexNames())) //
.query(getQuery(query))//
.query(getQuery(query, clazz))//
.refresh(deleteByQueryRefresh(refreshPolicy));
if (query.isLimiting()) {
@ -787,10 +856,7 @@ class RequestConverter {
b.maxDocs(Long.valueOf(query.getMaxResults()));
}
if (query.hasScrollTime()) {
// noinspection ConstantConditions
b.scroll(Time.of(t -> t.time(query.getScrollTime().toMillis() + "ms")));
}
b.scroll(time(query.getScrollTime()));
if (query.getRoute() != null) {
b.routing(query.getRoute());
@ -800,6 +866,140 @@ class RequestConverter {
});
}
public UpdateRequest<Document, ?> documentUpdateRequest(UpdateQuery query, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy, @Nullable String routing) {
String indexName = query.getIndexName() != null ? query.getIndexName() : index.getIndexName();
return UpdateRequest.of(uqb -> {
uqb.index(indexName).id(query.getId());
if (query.getScript() != null) {
Map<String, JsonData> params = new HashMap<>();
if (query.getParams() != null) {
query.getParams().forEach((key, value) -> {
params.put(key, JsonData.of(value, jsonpMapper));
});
}
uqb.script(sb -> {
if (query.getScriptType() == ScriptType.INLINE) {
sb.inline(is -> is //
.lang(query.getLang()) //
.source(query.getScript()) //
.params(params)); //
} else if (query.getScriptType() == ScriptType.STORED) {
sb.stored(ss -> ss //
.id(query.getScript()) //
.params(params) //
);
}
return sb;
}
);
}
uqb //
.doc(query.getDocument()) //
.upsert(query.getUpsert()) //
.routing(query.getRouting() != null ? query.getRouting() : routing) //
.scriptedUpsert(query.getScriptedUpsert()) //
.docAsUpsert(query.getDocAsUpsert()) //
.ifSeqNo(query.getIfSeqNo() != null ? Long.valueOf(query.getIfSeqNo()) : null) //
.ifPrimaryTerm(query.getIfPrimaryTerm() != null ? Long.valueOf(query.getIfPrimaryTerm()) : null) //
.refresh(refresh(refreshPolicy)) //
.retryOnConflict(query.getRetryOnConflict()) //
;
if (query.getFetchSource() != null) {
uqb.source(sc -> sc.fetch(query.getFetchSource()));
}
if (query.getFetchSourceIncludes() != null || query.getFetchSourceExcludes() != null) {
List<String> includes = query.getFetchSourceIncludes() != null ? query.getFetchSourceIncludes()
: Collections.emptyList();
List<String> excludes = query.getFetchSourceExcludes() != null ? query.getFetchSourceExcludes()
: Collections.emptyList();
uqb.source(sc -> sc.filter(sf -> sf.includes(includes).excludes(excludes)));
}
if (query.getTimeout() != null) {
uqb.timeout(tv -> tv.time(query.getTimeout()));
}
String waitForActiveShards = query.getWaitForActiveShards();
if (waitForActiveShards != null) {
if ("all".equalsIgnoreCase(waitForActiveShards)) {
uqb.waitForActiveShards(wfa -> wfa.option(WaitForActiveShardOptions.All));
} else {
int val;
try {
val = Integer.parseInt(waitForActiveShards);
} catch (NumberFormatException var3) {
throw new IllegalArgumentException("cannot parse ActiveShardCount[" + waitForActiveShards + "]", var3);
}
uqb.waitForActiveShards(wfa -> wfa.count(val));
}
}
return uqb;
} //
);
}
public UpdateByQueryRequest documentUpdateByQueryRequest(UpdateQuery updateQuery, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy) {
return UpdateByQueryRequest.of(ub -> {
ub //
.index(Arrays.asList(index.getIndexNames())) //
.refresh(refreshPolicy == RefreshPolicy.IMMEDIATE) //
.routing(updateQuery.getRouting()) //
.script(getScript(updateQuery.getScriptData())) //
.maxDocs(updateQuery.getMaxDocs() != null ? Long.valueOf(updateQuery.getMaxDocs()) : null) //
.pipeline(updateQuery.getPipeline()) //
.requestsPerSecond(
updateQuery.getRequestsPerSecond() != null ? updateQuery.getRequestsPerSecond().longValue() : null) //
.slices(updateQuery.getSlices() != null ? Long.valueOf(updateQuery.getSlices()) : null) //
;
if (updateQuery.getAbortOnVersionConflict() != null) {
ub.conflicts(updateQuery.getAbortOnVersionConflict() ? Conflicts.Abort : Conflicts.Proceed);
}
if (updateQuery.getBatchSize() != null) {
ub.size(Long.valueOf(updateQuery.getBatchSize()));
}
if (updateQuery.getQuery() != null) {
Query queryQuery = updateQuery.getQuery();
ub.query(getQuery(queryQuery, null));
// no indicesOptions available like in old client
ub.scroll(time(queryQuery.getScrollTime()));
}
// no maxRetries available like in old client
// no shouldStoreResult
if (updateQuery.getRefreshPolicy() != null) {
ub.refresh(updateQuery.getRefreshPolicy() == RefreshPolicy.IMMEDIATE);
}
if (updateQuery.getTimeout() != null) {
ub.timeout(tb -> tb.time(updateQuery.getTimeout()));
}
if (updateQuery.getWaitForActiveShards() != null) {
ub.waitForActiveShards(w -> w.count(waitForActiveShardsCount(updateQuery.getWaitForActiveShards())));
}
return ub;
});
}
// endregion
// region search
@ -831,13 +1031,36 @@ class RequestConverter {
builder.scroll(t -> t.time(scrollTimeInMillis + "ms"));
}
builder.query(getQuery(query));
builder.query(getQuery(query, clazz));
addFilter(query, builder);
return builder.build();
}
public MsearchRequest searchMsearchRequest(
List<ElasticsearchTemplate.MultiSearchQueryParameter> multiSearchQueryParameters) {
return MsearchRequest.of(mrb -> {
multiSearchQueryParameters.forEach(param -> {
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntity(param.clazz);
mrb.searches(sb -> sb //
.header(h -> h //
.index(param.index.getIndexName()) //
// todo #1973 add remaining flags for header
) //
.body(bb -> bb //
.query(getQuery(param.query, param.clazz))//
// #1973 seq_no_primary_term and version not available in client ES issue 161
// todo #1973 add remaining flags for body
) //
);
});
return mrb;
});
}
private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
SearchRequest.Builder builder, boolean forCount, boolean useScroll) {
@ -930,7 +1153,11 @@ class RequestConverter {
if (!isEmpty(query.getSearchAfter())) {
builder.searchAfter(query.getSearchAfter().stream().map(Object::toString).collect(Collectors.toList()));
}
// todo #1973 rescorer queries
query.getRescorerQueries().forEach(rescorerQuery -> {
builder.rescore(getRescore(rescorerQuery));
});
// todo #1973 request cache
if (!query.getRuntimeFields().isEmpty()) {
@ -952,12 +1179,27 @@ class RequestConverter {
// request_cache is not allowed on scroll requests.
builder.requestCache(null);
Duration scrollTimeout = query.getScrollTime() != null ? query.getScrollTime() : Duration.ofMinutes(1);
builder.scroll(tv -> tv.time(scrollTimeout.toMillis() + "ms"));
builder.scroll(time(scrollTimeout));
// limit the number of documents in a batch
builder.size(500);
}
}
private Rescore getRescore(RescorerQuery rescorerQuery) {
return Rescore.of(r -> r //
.query(rq -> rq //
.query(getQuery(rescorerQuery.getQuery(), null)) //
.scoreMode(scoreMode(rescorerQuery.getScoreMode())) //
.queryWeight(rescorerQuery.getQueryWeight() != null ? Double.valueOf(rescorerQuery.getQueryWeight()) : 1.0) //
.rescoreQueryWeight(
rescorerQuery.getRescoreQueryWeight() != null ? Double.valueOf(rescorerQuery.getRescoreQueryWeight())
: 1.0) //
) //
.windowSize(rescorerQuery.getWindowSize()));
}
private void addHighlight(Query query, SearchRequest.Builder builder) {
Highlight highlight = query.getHighlightQuery()
@ -1032,23 +1274,35 @@ class RequestConverter {
}
private void prepareNativeSearch(NativeQuery query, SearchRequest.Builder builder) {
// todo #1973 script fields
// todo #1973 collapse builder
query.getScriptedFields().forEach(scriptedField -> {
builder.scriptFields(scriptedField.getFieldName(), sf -> sf.script(getScript(scriptedField.getScriptData())));
});
builder //
.suggest(query.getSuggester()) //
.collapse(query.getFieldCollapse()) //
;
// todo #1973 indices boost
if (!isEmpty(query.getAggregations())) {
builder.aggregations(query.getAggregations());
}
builder.suggest(query.getSuggester());
// todo #1973 searchExt
}
@Nullable
private co.elastic.clients.elasticsearch._types.query_dsl.Query getQuery(Query query) {
private co.elastic.clients.elasticsearch._types.query_dsl.Query getQuery(@Nullable Query query,
@Nullable Class<?> clazz) {
if (query == null) {
return null;
}
elasticsearchConverter.updateQuery(query, clazz);
// todo #1973 some native stuff
co.elastic.clients.elasticsearch._types.query_dsl.Query esQuery = null;
if (query instanceof CriteriaQuery) {
@ -1074,7 +1328,7 @@ class RequestConverter {
} else if (query instanceof StringQuery) {
// no filter for StringQuery
} else if (query instanceof NativeQuery) {
// todo #1973 NativeQuery filter
builder.postFilter(((NativeQuery) query).getFilter());
} else {
throw new IllegalArgumentException("unhandled Query implementation " + query.getClass().getName());
}
@ -1128,6 +1382,7 @@ class RequestConverter {
return moreLikeThisQuery;
}
// endregion
// region helper functions

View File

@ -23,6 +23,7 @@ import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.mget.MultiGetError;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.indices.*;
@ -314,6 +315,56 @@ class ResponseConverter {
}
public ByQueryResponse byQueryResponse(DeleteByQueryResponse response) {
// the code for the methods taking a DeleteByQueryResponse or a UpdateByQueryResponse is duplicated because the
// Elasticsearch responses do not share a common class
// noinspection DuplicatedCode
List<ByQueryResponse.Failure> failures = response.failures().stream().map(this::byQueryResponseFailureOf)
.collect(Collectors.toList());
ByQueryResponse.ByQueryResponseBuilder builder = ByQueryResponse.builder();
if (response.took() != null) {
builder.withTook(response.took());
}
if (response.timedOut() != null) {
builder.withTimedOut(response.timedOut());
}
if (response.total() != null) {
builder.withTotal(response.total());
}
if (response.deleted() != null) {
builder.withDeleted(response.deleted());
}
if (response.batches() != null) {
builder.withBatches(Math.toIntExact(response.batches()));
}
if (response.versionConflicts() != null) {
builder.withVersionConflicts(response.versionConflicts());
}
if (response.noops() != null) {
builder.withNoops(response.noops());
}
if (response.retries() != null) {
builder.withBulkRetries(response.retries().bulk());
builder.withSearchRetries(response.retries().search());
}
builder.withFailures(failures);
return builder.build();
}
public ByQueryResponse byQueryResponse(UpdateByQueryResponse response) {
// the code for the methods taking a DeleteByQueryResponse or a UpdateByQueryResponse is duplicated because the
// Elasticsearch responses do not share a common class
// noinspection DuplicatedCode
List<ByQueryResponse.Failure> failures = response.failures().stream().map(this::byQueryResponseFailureOf)
.collect(Collectors.toList());
@ -358,8 +409,8 @@ class ResponseConverter {
}
// endregion
// region helper functions
private long timeToLong(Time time) {
if (time.isTime()) {

View File

@ -20,7 +20,9 @@ import co.elastic.clients.elasticsearch._types.DistanceUnit;
import co.elastic.clients.elasticsearch._types.GeoDistanceType;
import co.elastic.clients.elasticsearch._types.OpType;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.SortMode;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.VersionType;
import co.elastic.clients.elasticsearch._types.mapping.FieldType;
import co.elastic.clients.elasticsearch.core.search.BoundaryScanner;
@ -30,11 +32,16 @@ import co.elastic.clients.elasticsearch.core.search.HighlighterFragmenter;
import co.elastic.clients.elasticsearch.core.search.HighlighterOrder;
import co.elastic.clients.elasticsearch.core.search.HighlighterTagsSchema;
import co.elastic.clients.elasticsearch.core.search.HighlighterType;
import co.elastic.clients.elasticsearch.core.search.ScoreMode;
import java.time.Duration;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.elasticsearch.core.query.GeoDistanceOrder;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Order;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.lang.Nullable;
@ -243,6 +250,54 @@ final class TypeUtils {
}
}
@Nullable
static UpdateResponse.Result result(@Nullable Result result) {
if (result == null) {
return null;
}
switch (result) {
case Created:
return UpdateResponse.Result.CREATED;
case Updated:
return UpdateResponse.Result.UPDATED;
case Deleted:
return UpdateResponse.Result.DELETED;
case NotFound:
return UpdateResponse.Result.NOT_FOUND;
case NoOp:
return UpdateResponse.Result.NOOP;
}
return null;
}
@Nullable
static ScoreMode scoreMode(@Nullable RescorerQuery.ScoreMode scoreMode) {
if (scoreMode == null) {
return null;
}
switch (scoreMode) {
case Default:
return null;
case Avg:
return ScoreMode.Avg;
case Max:
return ScoreMode.Max;
case Min:
return ScoreMode.Min;
case Total:
return ScoreMode.Total;
case Multiply:
return ScoreMode.Multiply;
}
return null;
}
@Nullable
static SortMode sortMode(Order.Mode mode) {
@ -260,6 +315,16 @@ final class TypeUtils {
return null;
}
@Nullable
static Time time(@Nullable Duration duration) {
if (duration == null) {
return null;
}
return Time.of(t -> t.time(duration.toMillis() + "ms"));
}
@Nullable
static VersionType versionType(
@Nullable org.springframework.data.elasticsearch.annotations.Document.VersionType versionType) {

View File

@ -443,7 +443,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
IndexCoordinates index);
@Override
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType) {
public Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType) {
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
}

View File

@ -22,8 +22,8 @@ import java.util.List;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
@ -237,7 +237,7 @@ public interface ReactiveSearchOperations {
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType);
Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType);
/**
* Perform an aggregation specified by the given {@link Query query}. <br />
@ -248,7 +248,7 @@ public interface ReactiveSearchOperations {
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
/**
* Does a suggest query

View File

@ -62,15 +62,15 @@ public class BaseQuery implements Query {
@Nullable protected Integer maxResults;
@Nullable protected HighlightQuery highlightQuery;
@Nullable private Boolean trackTotalHits;
@Nullable private Integer trackTotalHitsUpTo;
@Nullable private Duration scrollTime;
@Nullable private Duration timeout;
@Nullable protected Integer trackTotalHitsUpTo;
@Nullable protected Duration scrollTime;
@Nullable protected Duration timeout;
private boolean explain = false;
@Nullable private List<Object> searchAfter;
@Nullable protected List<Object> searchAfter;
protected List<RescorerQuery> rescorerQueries = new ArrayList<>();
@Nullable protected Boolean requestCache;
private List<IdWithRouting> idsWithRouting = Collections.emptyList();
private final List<RuntimeField> runtimeFields = new ArrayList<>();
protected List<IdWithRouting> idsWithRouting = Collections.emptyList();
protected final List<RuntimeField> runtimeFields = new ArrayList<>();
public BaseQuery() {}
@ -86,6 +86,8 @@ public class BaseQuery implements Query {
this.preference = builder.getPreference();
this.sourceFilter = builder.getSourceFilter();
this.fields = builder.getFields();
this.highlightQuery = builder.highlightQuery;
this.route = builder.getRoute();
// #1973 add the other fields to the builder
}

View File

@ -44,6 +44,8 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
@Nullable private String preference;
@Nullable private SourceFilter sourceFilter;
private List<String> fields = new ArrayList<>();
@Nullable protected HighlightQuery highlightQuery;
@Nullable private String route;
@Nullable
public Pageable getPageable() {
@ -92,6 +94,16 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
return fields;
}
@Nullable
public HighlightQuery getHighlightQuery() {
return highlightQuery;
}
@Nullable
public String getRoute() {
return route;
}
public SELF withPageable(Pageable pageable) {
this.pageable = pageable;
return self();
@ -156,6 +168,16 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
return self();
}
public SELF withHighlightQuery(HighlightQuery highlightQuery) {
this.highlightQuery = highlightQuery;
return self();
}
public SELF withRoute(String route) {
this.route = route;
return self();
}
public abstract Q build();
private SELF self() {

View File

@ -65,7 +65,6 @@ public class NativeSearchQueryBuilder extends BaseQueryBuilder<NativeSearchQuery
@Nullable private CollapseBuilder collapseBuilder;
@Nullable private List<IndexBoost> indicesBoost = new ArrayList<>();
@Nullable private SearchTemplateRequestBuilder searchTemplateBuilder;
@Nullable private String route;
@Nullable private SearchType searchType;
@Nullable private Boolean trackTotalHits;
@Nullable private Duration timeout;
@ -232,11 +231,6 @@ public class NativeSearchQueryBuilder extends BaseQueryBuilder<NativeSearchQuery
return this;
}
public NativeSearchQueryBuilder withRoute(String route) {
this.route = route;
return this;
}
public NativeSearchQueryBuilder withSearchType(SearchType searchType) {
this.searchType = searchType;
return this;
@ -320,10 +314,6 @@ public class NativeSearchQueryBuilder extends BaseQueryBuilder<NativeSearchQuery
nativeSearchQuery.setPipelineAggregations(pipelineAggregationBuilders);
}
if (route != null) {
nativeSearchQuery.setRoute(route);
}
if (searchType != null) {
nativeSearchQuery.setSearchType(Query.SearchType.valueOf(searchType.name()));
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import java.util.Map;
import org.springframework.data.elasticsearch.core.ScriptType;
import org.springframework.lang.Nullable;
/**
* value class combining script information.
*
* @author Peter-Josef Meisch
* @since 4.4
*/
public final class ScriptData {
@Nullable private final ScriptType type;
@Nullable private final String language;
@Nullable private final String script;
@Nullable private final String scriptName;
@Nullable private final Map<String, Object> params;
public ScriptData(@Nullable ScriptType type, @Nullable String language, @Nullable String script,
@Nullable String scriptName, @Nullable Map<String, Object> params) {
this.type = type;
this.language = language;
this.script = script;
this.scriptName = scriptName;
this.params = params;
}
@Nullable
public ScriptType getType() {
return type;
}
@Nullable
public String getLanguage() {
return language;
}
@Nullable
public String getScript() {
return script;
}
@Nullable
public String getScriptName() {
return scriptName;
}
@Nullable
public Map<String, Object> getParams() {
return params;
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import org.springframework.util.Assert;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ScriptedField {
private final String fieldName;
private final ScriptData scriptData;
public ScriptedField(String fieldName, ScriptData scriptData) {
Assert.notNull(fieldName, "fieldName must not be null");
Assert.notNull(scriptData, "scriptData must not be null");
this.fieldName = fieldName;
this.scriptData = scriptData;
}
public String getFieldName() {
return fieldName;
}
public ScriptData getScriptData() {
return scriptData;
}
}

View File

@ -35,11 +35,8 @@ import org.springframework.lang.Nullable;
public class UpdateQuery {
private final String id;
@Nullable private final String script;
@Nullable private final Map<String, Object> params;
@Nullable private final Document document;
@Nullable private final Document upsert;
@Nullable private final String lang;
@Nullable private final String routing;
@Nullable private final Boolean scriptedUpsert;
@Nullable private final Boolean docAsUpsert;
@ -61,9 +58,8 @@ public class UpdateQuery {
@Nullable private final Float requestsPerSecond;
@Nullable private final Boolean shouldStoreResult;
@Nullable private final Integer slices;
@Nullable private final ScriptType scriptType;
@Nullable private final String scriptName;
@Nullable private final String indexName;
@Nullable private final ScriptData scriptData;
public static Builder builder(String id) {
return new Builder(id);
@ -85,11 +81,8 @@ public class UpdateQuery {
@Nullable String scriptName, @Nullable String indexName) {
this.id = id;
this.script = script;
this.params = params;
this.document = document;
this.upsert = upsert;
this.lang = lang;
this.routing = routing;
this.scriptedUpsert = scriptedUpsert;
this.docAsUpsert = docAsUpsert;
@ -111,9 +104,13 @@ public class UpdateQuery {
this.requestsPerSecond = requestsPerSecond;
this.shouldStoreResult = shouldStoreResult;
this.slices = slices;
this.scriptType = scriptType;
this.scriptName = scriptName;
this.indexName = indexName;
if (scriptType != null || lang != null || script != null || scriptName != null || params != null) {
this.scriptData = new ScriptData(scriptType, lang, script, scriptName, params);
} else {
this.scriptData = null;
}
}
public String getId() {
@ -122,12 +119,12 @@ public class UpdateQuery {
@Nullable
public String getScript() {
return script;
return scriptData != null ? scriptData.getScript() : null;
}
@Nullable
public Map<String, Object> getParams() {
return params;
return scriptData != null ? scriptData.getParams() : null;
}
@Nullable
@ -142,7 +139,7 @@ public class UpdateQuery {
@Nullable
public String getLang() {
return lang;
return scriptData != null ? scriptData.getLanguage() : null;
}
@Nullable
@ -252,12 +249,12 @@ public class UpdateQuery {
@Nullable
public ScriptType getScriptType() {
return scriptType;
return scriptData != null ? scriptData.getType() : null;
}
@Nullable
public String getScriptName() {
return scriptName;
return scriptData != null ? scriptData.getScriptName() : null;
}
/**
@ -268,13 +265,21 @@ public class UpdateQuery {
return indexName;
}
/**
* @since 4.4
*/
@Nullable
public ScriptData getScriptData() {
return scriptData;
}
public static final class Builder {
private String id;
@Nullable private String script = null;
@Nullable private Map<String, Object> params;
@Nullable private Document document = null;
@Nullable private Document upsert = null;
@Nullable private String lang = "painless";
@Nullable private String lang = null;
@Nullable private String routing = null;
@Nullable private Boolean scriptedUpsert;
@Nullable private Boolean docAsUpsert;

View File

@ -35,6 +35,13 @@ public class UpdateResponse {
this.result = result;
}
/**
* @since 4.4
*/
public static UpdateResponse of(Result result) {
return new UpdateResponse(result);
}
public Result getResult() {
return result;
}

View File

@ -30,6 +30,17 @@ public class Highlight {
private final HighlightParameters parameters;
private final List<HighlightField> fields;
/**
* @since 4.4
*/
public Highlight(List<HighlightField> fields) {
Assert.notNull(fields, "fields must not be null");
this.parameters = HighlightParameters.builder().build();
this.fields = fields;
}
public Highlight(HighlightParameters parameters, List<HighlightField> fields) {
Assert.notNull(parameters, "parameters must not be null");

View File

@ -25,6 +25,17 @@ public class HighlightField {
private final String name;
private final HighlightFieldParameters parameters;
/**
* @since 4.4
*/
public HighlightField(String name) {
Assert.notNull(name, "name must not be null");
this.name = name;
this.parameters = HighlightFieldParameters.builder().build();
}
public HighlightField(String name, HighlightFieldParameters parameters) {
Assert.notNull(name, "name must not be null");

View File

@ -0,0 +1,57 @@
/*
// * Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch;
import static org.springframework.data.elasticsearch.client.elc.QueryBuilders.*;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.QueryBuilders;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
/**
* Class providing some queries for the new Elasticsearch client needed in different tests.
*
* @author Peter-Josef Meisch
* @since 4.4
*/
public final class ELCQueries {
private ELCQueries() {}
public static Query getTermsAggsQuery(String aggsName, String aggsField){
return NativeQuery.builder() //
.withQuery(QueryBuilders.matchAllQueryAsQuery()) //
.withAggregation(aggsName, Aggregation.of(a -> a //
.terms(ta -> ta.field(aggsField)))) //
.withMaxResults(0) //
.build();
}
public static Query queryWithIds(String... ids) {
return NativeQuery.builder().withIds(ids).build();
}
public static BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return NativeQuery.builder().withQuery(matchAllQueryAsQuery());
}
public static BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value) {
return NativeQuery.builder().withQuery(termQueryAsQuery(field, value));
}
}

View File

@ -18,16 +18,29 @@ package org.springframework.data.elasticsearch.core;
import static org.springframework.data.elasticsearch.client.elc.QueryBuilders.*;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.FunctionBoostMode;
import co.elastic.clients.elasticsearch._types.query_dsl.FunctionScoreMode;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import co.elastic.clients.json.JsonData;
import java.util.Map;
import org.junit.jupiter.api.DisplayName;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.ELCQueries;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.NativeQueryBuilder;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilterBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptData;
import org.springframework.data.elasticsearch.core.query.ScriptedField;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -54,7 +67,7 @@ public class ElasticsearchELCIntegrationTests extends ElasticsearchIntegrationTe
@Override
protected Query queryWithIds(String... ids) {
return NativeQuery.builder().withIds(ids).build();
return ELCQueries.queryWithIds(ids);
}
@Override
@ -64,7 +77,7 @@ public class ElasticsearchELCIntegrationTests extends ElasticsearchIntegrationTe
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return NativeQuery.builder().withQuery(matchAllQueryAsQuery());
return ELCQueries.getBuilderWithMatchAllQuery();
}
@Override
@ -93,4 +106,101 @@ public class ElasticsearchELCIntegrationTests extends ElasticsearchIntegrationTe
.withMinScore(minScore) //
.build();
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
return NativeQuery.builder() //
.withQuery(matchAllQueryAsQuery()) //
.withFieldCollapse(FieldCollapse.of(fc -> {
fc.field(collapseField);
if (innerHits != null) {
fc.innerHits(ih -> ih.name(innerHits).size(size));
}
return fc;
})).build();
}
@Override
protected Query getMatchAllQueryWithFilterForId(String id) {
return NativeQuery.builder() //
.withQuery(matchAllQueryAsQuery()) //
.withFilter(termQueryAsQuery("id", id)) //
.build();
}
@Override
protected Query getQueryForParentId(String type, String id, @Nullable String route) {
NativeQueryBuilder queryBuilder = NativeQuery.builder() //
.withQuery(qb -> qb //
.parentId(p -> p.type(type).id(id)) //
);
if (route != null) {
queryBuilder.withRoute(route);
}
return queryBuilder.build();
}
@Override
protected Query getMatchAllQueryWithIncludesAndInlineExpressionScript(@Nullable String includes, String fieldName,
String script, Map<String, Object> params) {
NativeQueryBuilder nativeQueryBuilder = NativeQuery.builder().withQuery(matchAllQueryAsQuery());
if (includes != null) {
nativeQueryBuilder.withSourceFilter(new FetchSourceFilterBuilder().withIncludes(includes).build());
}
return nativeQueryBuilder.withScriptedField(new ScriptedField( //
fieldName, //
new ScriptData(ScriptType.INLINE, "expression", script, null, params))) //
.build();
}
@Override
protected Query getQueryWithRescorer() {
return NativeQuery.builder() //
.withQuery(q -> q //
.bool(b -> b //
.filter(f -> f.exists(e -> e.field("rate"))) //
.should(s -> s.term(t -> t.field("message").value("message"))) //
)) //
.withResorerQuery( //
new RescorerQuery(NativeQuery.builder() //
.withQuery(q -> q //
.functionScore(fs -> fs //
.functions(f1 -> f1 //
.filter(matchAllQueryAsQuery()) //
.weight(1.0) //
.gauss(d -> d //
.field("rate") //
.placement(dp -> dp //
.origin(JsonData.of(0)) //
.scale(JsonData.of(10)) //
.decay(0.5)) //
)) //
.functions(f2 -> f2 //
.filter(matchAllQueryAsQuery()).weight(100.0) //
.gauss(d -> d //
.field("rate") //
.placement(dp -> dp //
.origin(JsonData.of(0)) //
.scale(JsonData.of(10)) //
.decay(0.5)) //
)) //
.scoreMode(FunctionScoreMode.Sum) //
.maxBoost(80.0) //
.boostMode(FunctionBoostMode.Replace)) //
) //
.build() //
) //
.withScoreMode(RescorerQuery.ScoreMode.Max) //
.withWindowSize(100)) //
.build();
}
}

View File

@ -27,8 +27,17 @@ import java.util.Map;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.join.query.ParentIdQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.json.JSONException;
import org.junit.jupiter.api.DisplayName;
@ -38,12 +47,16 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilterBuilder;
import org.springframework.data.elasticsearch.core.query.IndicesOptions;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptField;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -115,6 +128,79 @@ public class ElasticsearchERHLCIntegrationTests extends ElasticsearchIntegration
.withMinScore(minScore).build();
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseField);
if (innerHits != null) {
InnerHitBuilder innerHitBuilder = new InnerHitBuilder(innerHits);
if (size != null) {
innerHitBuilder.setSize(size);
}
collapseBuilder.setInnerHits(innerHitBuilder);
}
return new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withCollapseBuilder(collapseBuilder).build();
}
@Override
protected Query getMatchAllQueryWithFilterForId(String id) {
return new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withFilter(boolQuery().filter(termQuery("id", id)))
.build();
}
@Override
protected Query getQueryForParentId(String type, String id, @Nullable String route) {
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withQuery(new ParentIdQueryBuilder(type, id));
if (route != null) {
queryBuilder.withRoute(route);
}
return queryBuilder.build();
}
@Override
protected Query getMatchAllQueryWithIncludesAndInlineExpressionScript(@Nullable String includes, String fieldName,
String script, Map<String, java.lang.Object> params) {
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder().withQuery(matchAllQuery());
if (includes != null) {
nativeSearchQueryBuilder.withSourceFilter(new FetchSourceFilterBuilder().withIncludes(includes).build());
}
return nativeSearchQueryBuilder.withScriptField(new ScriptField(fieldName,
new Script(org.elasticsearch.script.ScriptType.INLINE, "expression", script, params))).build();
}
@Override
protected Query getQueryWithRescorer() {
return new NativeSearchQueryBuilder() //
.withQuery( //
boolQuery() //
.filter(existsQuery("rate")) //
.should(termQuery("message", "message"))) //
.withRescorerQuery( //
new RescorerQuery( //
new NativeSearchQueryBuilder() //
.withQuery(QueryBuilders
.functionScoreQuery(new FunctionScoreQueryBuilder.FilterFunctionBuilder[] {
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(1f)),
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(100f)) }) //
.scoreMode(FunctionScoreQuery.ScoreMode.SUM) //
.maxBoost(80f) //
.boostMode(CombineFunction.REPLACE)) //
.build())//
.withScoreMode(RescorerQuery.ScoreMode.Max) //
.withWindowSize(100)) //
.build();
}
@Test // DATAES-768
void shouldUseAllOptionsFromUpdateQuery() {
Map<String, Object> doc = new HashMap<>();

View File

@ -36,19 +36,6 @@ import java.util.stream.IntStream;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.util.Lists;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder.FilterFunctionBuilder;
import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder;
import org.elasticsearch.join.query.ParentIdQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Order;
@ -82,7 +69,8 @@ import org.springframework.data.elasticsearch.core.index.Settings;
import org.springframework.data.elasticsearch.core.join.JoinField;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode;
import org.springframework.data.elasticsearch.core.query.highlight.Highlight;
import org.springframework.data.elasticsearch.core.query.highlight.HighlightField;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.data.util.StreamUtils;
@ -170,7 +158,23 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
protected abstract BaseQueryBuilder<?, ?> getBuilderWithWildcardQuery(String field, String value);
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
protected abstract Query getQueryWithCollapse(String collapseField, @Nullable String innerHits,
@Nullable Integer size);
protected abstract Query getMatchAllQueryWithFilterForId(String id);
protected abstract Query getQueryForParentId(String type, String id, @Nullable String route);
protected Query getMatchAllQueryWithInlineExpressionScript(String fieldName, String script,
Map<String, java.lang.Object> params) {
return getMatchAllQueryWithIncludesAndInlineExpressionScript(null, fieldName, script, params);
}
protected abstract Query getMatchAllQueryWithIncludesAndInlineExpressionScript(@Nullable String includes,
String fieldName, String script, Map<String, java.lang.Object> params);
protected abstract Query getQueryWithRescorer();
@Test
public void shouldThrowDataAccessExceptionIfDocumentDoesNotExistWhileDoingPartialUpdate() {
@ -451,7 +455,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getTotalHits()).isEqualTo(2);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldDoBulkUpdate() {
@ -618,11 +621,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(operations.count(searchQuery, IndexCoordinates.of("test-index-*"))).isEqualTo(2);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldFilterSearchResultsForGivenFilter() {
// given
String documentId = nextIdAsString();
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message")
.version(System.currentTimeMillis()).build();
@ -630,14 +631,11 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
IndexQuery indexQuery = getIndexQuery(sampleEntity);
operations.index(indexQuery, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withFilter(boolQuery().filter(termQuery("id", documentId))).build();
Query query = getMatchAllQueryWithFilterForId(documentId);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
assertThat(searchHits.getTotalHits()).isEqualTo(1);
}
@ -825,7 +823,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getTotalHits()).isEqualTo(1);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldUseScriptedFields() {
@ -847,10 +844,8 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
params.put("factor", 2);
// when
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withScriptField(
new ScriptField("scriptedRate", new Script(ScriptType.INLINE, "expression", "doc['rate'] * factor", params)))
.build();
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
Query query = getMatchAllQueryWithInlineExpressionScript("scriptedRate", "doc['rate'] * factor", params);
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -1502,7 +1497,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(indexOperations.exists()).isFalse();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldDoPartialUpdateForExistingDocument() {
@ -1534,10 +1528,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
void shouldDoUpdateByQueryForExistingDocument() {
// given
final String documentId = nextIdAsString();
final String messageBeforeUpdate = "some test message";
final String messageAfterUpdate = "test message";
@ -1549,7 +1542,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.index(indexQuery, IndexCoordinates.of(indexNameProvider.indexName()));
final NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
final Query query = operations.matchAllQuery();
final UpdateQuery updateQuery = UpdateQuery.builder(query)
.withScriptType(org.springframework.data.elasticsearch.core.ScriptType.INLINE)
@ -1557,42 +1550,15 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
.withParams(Collections.singletonMap("newMessage", messageAfterUpdate)).withAbortOnVersionConflict(true)
.build();
// when
operations.updateByQuery(updateQuery, IndexCoordinates.of(indexNameProvider.indexName()));
// then
SampleEntity indexedEntity = operations.get(documentId, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-227
public void shouldUseUpsertOnUpdate() {
// given
Map<String, Object> doc = new HashMap<>();
doc.put("id", "1");
doc.put("message", "test");
org.springframework.data.elasticsearch.core.document.Document document = org.springframework.data.elasticsearch.core.document.Document
.from(doc);
UpdateQuery updateQuery = UpdateQuery.builder("1") //
.withDocument(document) //
.withUpsert(document) //
.build();
// when
UpdateRequest request = getRequestFactory().updateRequest(updateQuery, IndexCoordinates.of("index"));
// then
assertThat(request).isNotNull();
assertThat(request.upsertRequest()).isNotNull();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldDoUpsertIfDocumentDoesNotExist() {
@ -1650,63 +1616,57 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(entities.size()).isGreaterThanOrEqualTo(1);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@DisabledIf(value = "newElasticsearchClient",
disabledReason = "todo #1973 can't check response, open ES issue 161 that does not allow seqno")
// and version to be set in the request
@Test // DATAES-487
public void shouldReturnSameEntityForMultiSearch() {
// given
List<IndexQuery> indexQueries = new ArrayList<>();
indexQueries.add(buildIndex(SampleEntity.builder().id("1").message("ab").build()));
indexQueries.add(buildIndex(SampleEntity.builder().id("2").message("bc").build()));
indexQueries.add(buildIndex(SampleEntity.builder().id("3").message("ac").build()));
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
List<Query> queries = new ArrayList<>();
queries.add(getTermQuery("message", "ab"));
queries.add(getTermQuery("message", "bc"));
queries.add(getTermQuery("message", "ac"));
// when
List<NativeSearchQuery> queries = new ArrayList<>();
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "ab")).build());
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "bc")).build());
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "ac")).build());
// then
List<SearchHits<SampleEntity>> searchHits = operations.multiSearch(queries, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
for (SearchHits<SampleEntity> sampleEntity : searchHits) {
assertThat(sampleEntity.getTotalHits()).isEqualTo(1);
}
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@DisabledIf(value = "newElasticsearchClient",
disabledReason = "todo #1973 can't check response, open ES issue 161 that does not allow seqno")
// and version to be set in the request
@Test // DATAES-487
public void shouldReturnDifferentEntityForMultiSearch() {
// given
Class<Book> clazz = Book.class;
IndexOperations bookIndexOperations = operations.indexOps(Book.class);
bookIndexOperations.delete();
bookIndexOperations.create();
indexOperations.putMapping(clazz);
bookIndexOperations.createWithMapping();
bookIndexOperations.refresh();
IndexCoordinates bookIndex = IndexCoordinates.of("test-index-book-core-template");
IndexCoordinates bookIndex = IndexCoordinates.of("i-need-my-own-index");
operations.index(buildIndex(SampleEntity.builder().id("1").message("ab").build()),
IndexCoordinates.of(indexNameProvider.indexName()));
operations.index(buildIndex(Book.builder().id("2").description("bc").build()), bookIndex);
bookIndexOperations.refresh();
// when
List<NativeSearchQuery> queries = new ArrayList<>();
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "ab")).build());
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("description", "bc")).build());
List<Query> queries = new ArrayList<>();
queries.add(getTermQuery("message", "ab"));
queries.add(getTermQuery("description", "bc"));
List<SearchHits<?>> searchHitsList = operations.multiSearch(queries, Lists.newArrayList(SampleEntity.class, clazz),
IndexCoordinates.of(indexNameProvider.indexName(), bookIndex.getIndexName()));
// then
bookIndexOperations.delete();
SearchHits<?> searchHits0 = searchHitsList.get(0);
assertThat(searchHits0.getTotalHits()).isEqualTo(1L);
SearchHit<SampleEntity> searchHit0 = (SearchHit<SampleEntity>) searchHits0.getSearchHit(0);
@ -1950,7 +1910,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
@Test
public void shouldIndexSampleEntityWithIndexAtRuntime() {
String indexName = "custom-" + indexNameProvider.indexName();
String indexName = indexNameProvider.indexName() + "-custom";
// given
String documentId = nextIdAsString();
@ -2699,7 +2659,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(sampleEntities.get(2).getContent().getMessage()).isEqualTo(sampleEntity1.getMessage());
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-593
public void shouldReturnDocumentWithCollapsedField() {
@ -2715,11 +2674,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withCollapseField("rate")
.build();
Query query = getQueryWithCollapse("rate", null, null);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -2730,7 +2687,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getSearchHit(1).getContent().getMessage()).isEqualTo("message 2");
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1493
@DisplayName("should return document with collapse field and inner hits")
public void shouldReturnDocumentWithCollapsedFieldAndInnerHits() {
@ -2747,11 +2703,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withCollapseBuilder(new CollapseBuilder("rate").setInnerHits(new InnerHitBuilder("innerHits"))).build();
Query query = getQueryWithCollapse("rate", "innerHits", null);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -2764,7 +2719,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getSearchHit(1).getInnerHits("innerHits").getTotalHits()).isEqualTo(1);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1997
@DisplayName("should return document with inner hits size zero")
void shouldReturnDocumentWithInnerHitsSizeZero() {
@ -2777,12 +2732,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withCollapseBuilder(new CollapseBuilder("rate").setInnerHits(new InnerHitBuilder("innerHits").setSize(0)))
.build();
Query query = getQueryWithCollapse("rate", "innerHits", 0);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -2867,7 +2820,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
List<Object> sortValues = searchHit.getSortValues();
assertThat(sortValues).hasSize(2);
assertThat(sortValues.get(0)).isInstanceOf(String.class).isEqualTo("thousands");
// transport client returns Long, RestHghlevelClient Integer, new ElasticsearchClient String
// transport client returns Long, RestHighlevelClient Integer, new ElasticsearchClient String
java.lang.Object o = sortValues.get(1);
if (o instanceof Integer) {
Integer i = (Integer) o;
@ -2882,7 +2835,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-715
void shouldReturnHighlightFieldsInSearchHit() {
IndexCoordinates index = IndexCoordinates.of("test-index-highlight-entity-template");
@ -2893,11 +2845,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.index(indexQuery, index);
operations.indexOps(index).refresh();
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(termQuery("message", "message")) //
.withHighlightFields(new HighlightBuilder.Field("message")) //
Query query = getBuilderWithTermQuery("message", "message") //
.withHighlightQuery(
new HighlightQuery(new Highlight(singletonList(new HighlightField("message"))), HighlightEntity.class))
.build();
SearchHits<HighlightEntity> searchHits = operations.search(query, HighlightEntity.class, index);
assertThat(searchHits).isNotNull();
@ -2910,10 +2861,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(highlightField.get(1)).contains("<em>message</em>");
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1686
void shouldRunRescoreQueryInSearchQuery() {
IndexCoordinates index = IndexCoordinates.of("test-index-rescore-entity-template");
IndexCoordinates index = IndexCoordinates.of(indexNameProvider.getPrefix() + "rescore-entity");
// matches main query better
SampleEntity entity = SampleEntity.builder() //
@ -2933,17 +2883,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, index);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(boolQuery().filter(existsQuery("rate")).should(termQuery("message", "message"))) //
.withRescorerQuery(
new RescorerQuery(new NativeSearchQueryBuilder().withQuery(QueryBuilders
.functionScoreQuery(new FunctionScoreQueryBuilder.FilterFunctionBuilder[] {
new FilterFunctionBuilder(new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(1f)),
new FilterFunctionBuilder(
new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(100f)) })
.scoreMode(FunctionScoreQuery.ScoreMode.SUM).maxBoost(80f).boostMode(CombineFunction.REPLACE)).build())
.withScoreMode(ScoreMode.Max).withWindowSize(100))
.build();
Query query = getQueryWithRescorer();
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class, index);
@ -2956,6 +2896,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHit.getScore()).isEqualTo(80f);
}
@Test
// DATAES-738
void shouldSaveEntityWithIndexCoordinates() {
@ -3128,7 +3069,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThatSeqNoPrimaryTermIsFilled(retrieved);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@DisabledIf(value = "newElasticsearchClient",
disabledReason = "todo #1973 can't check response, open ES issue 161 that does not allow seqno")
// and version to be set in the request
@Test // DATAES-799
void multiSearchShouldReturnSeqNoPrimaryTerm() {
OptimisticEntity original = new OptimisticEntity();
@ -3158,7 +3101,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThatSeqNoPrimaryTermIsFilled(retrieved);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnEntityWithSeqNoPrimaryTermProperty() {
OptimisticEntity original = new OptimisticEntity();
@ -3175,7 +3117,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThatThrownBy(() -> operations.save(forEdit2)).isInstanceOf(OptimisticLockingFailureException.class);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnVersionedEntityWithSeqNoPrimaryTermProperty() {
OptimisticAndVersionedEntity original = new OptimisticAndVersionedEntity();
@ -3204,7 +3145,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.save(forEdit);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
void shouldSupportCRUDOpsForEntityWithJoinFields() throws Exception {
String qId1 = java.util.UUID.randomUUID().toString();
@ -3258,9 +3198,8 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.save(
Arrays.asList(sampleQuestionEntity1, sampleQuestionEntity2, sampleAnswerEntity1, sampleAnswerEntity2), index);
SearchHits<SampleJoinEntity> hits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId1)).build(),
SampleJoinEntity.class);
Query query = getQueryForParentId("answer", qId1, null);
SearchHits<SampleJoinEntity> hits = operations.search(query, SampleJoinEntity.class);
List<String> hitIds = hits.getSearchHits().stream()
.map(sampleJoinEntitySearchHit -> sampleJoinEntitySearchHit.getId()).collect(Collectors.toList());
@ -3287,8 +3226,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
// when
operations.bulkUpdate(queries, IndexCoordinates.of(indexNameProvider.indexName()));
SearchHits<SampleJoinEntity> updatedHits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId2)).build(),
SearchHits<SampleJoinEntity> updatedHits = operations.search(getQueryForParentId("answer", qId2, null),
SampleJoinEntity.class);
List<String> hitIds = updatedHits.getSearchHits().stream().map(new Function<SearchHit<SampleJoinEntity>, String>() {
@ -3300,8 +3238,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(hitIds.size()).isEqualTo(1);
assertThat(hitIds.get(0)).isEqualTo(aId2);
updatedHits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId1)).build(),
updatedHits = operations.search(getQueryForParentId("answer", qId1, null),
SampleJoinEntity.class);
hitIds = updatedHits.getSearchHits().stream().map(new Function<SearchHit<SampleJoinEntity>, String>() {
@ -3315,20 +3252,15 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
private void shouldDeleteEntityWithJoinFields(String qId2, String aId2) throws Exception {
Query query = new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId2)).withRoute(qId2)
.build();
operations.delete(query, SampleJoinEntity.class, IndexCoordinates.of(indexNameProvider.indexName()));
SearchHits<SampleJoinEntity> deletedHits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId2)).build(),
operations.delete(getQueryForParentId("answer", qId2, qId2), SampleJoinEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
SearchHits<SampleJoinEntity> deletedHits = operations.search(getQueryForParentId("answer", qId2, null),
SampleJoinEntity.class);
List<String> hitIds = deletedHits.getSearchHits().stream().map(new Function<SearchHit<SampleJoinEntity>, String>() {
@Override
public String apply(SearchHit<SampleJoinEntity> sampleJoinEntitySearchHit) {
return sampleJoinEntitySearchHit.getId();
}
}).collect(Collectors.toList());
List<String> hitIds = deletedHits.getSearchHits().stream()
.map(sampleJoinEntitySearchHit -> sampleJoinEntitySearchHit.getId()).collect(Collectors.toList());
assertThat(hitIds.size()).isEqualTo(0);
}
@ -3539,7 +3471,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(retrieved).isEqualTo(saved);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1488
@DisplayName("should set scripted fields on immutable objects")
void shouldSetScriptedFieldsOnImmutableObjects() {
@ -3549,13 +3480,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
Map<String, Object> params = new HashMap<>();
params.put("factor", 2);
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withSourceFilter(new FetchSourceFilterBuilder().withIncludes("*").build())
.withScriptField(new ScriptField("scriptedRate",
new Script(ScriptType.INLINE, "expression", "doc['rate'] * factor", params)))
.build();
Query query = getMatchAllQueryWithIncludesAndInlineExpressionScript("*", "scriptedRate", "doc['rate'] * factor",
params);
SearchHits<ImmutableWithScriptedEntity> searchHits = operations.search(searchQuery,
SearchHits<ImmutableWithScriptedEntity> searchHits = operations.search(query,
ImmutableWithScriptedEntity.class);
assertThat(searchHits.getTotalHits()).isEqualTo(1);
@ -3565,6 +3493,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(foundEntity.getScriptedRate()).isEqualTo(84.0);
}
@Test // #1893
@DisplayName("should index document from source with version")
void shouldIndexDocumentFromSourceWithVersion() {
@ -3875,7 +3804,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@Document(indexName = "test-index-book-core-template")
@Document(indexName = "i-need-my-own-index")
static class Book {
@Nullable
@Id private String id;
@ -4419,7 +4348,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@Document(indexName = "immutable-class")
@Document(indexName = "#{@indexNameProvider.indexName()}-immutable")
private static final class ImmutableEntity {
@Id
@Nullable private final String id;
@ -4477,7 +4406,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@Document(indexName = "immutable-scripted")
@Document(indexName = "#{@indexNameProvider.indexName()}-immutable-scripted")
public static final class ImmutableWithScriptedEntity {
@Id private final String id;
@Field(type = Integer)

View File

@ -15,11 +15,30 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.elasticsearch.client.elc.QueryBuilders.*;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch._types.aggregations.Buckets;
import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.ELCQueries;
import org.springframework.data.elasticsearch.client.elc.Aggregation;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregation;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -37,9 +56,66 @@ public class ReactiveElasticsearchELCIntegrationTests extends ReactiveElasticsea
return new IndexNameProvider("reactive-template");
}
}
@Override
protected Query getTermsAggsQuery(String aggsName, String aggsField) {
return ELCQueries.getTermsAggsQuery(aggsName, aggsField);
}
@Override
public boolean usesNewElasticsearchClient() {
return true;
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return ELCQueries.getBuilderWithMatchAllQuery();
}
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value) {
return ELCQueries.getBuilderWithTermQuery(field, value);
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
return NativeQuery.builder() //
.withQuery(matchAllQueryAsQuery()) //
.withFieldCollapse(FieldCollapse.of(fc -> {
fc.field(collapseField);
if (innerHits != null) {
fc.innerHits(ih -> ih.name(innerHits).size(size));
}
return fc;
})).build();
}
@Override
protected Query queryWithIds(String... ids) {
return ELCQueries.queryWithIds(ids);
}
@Override
protected <A extends AggregationContainer<?>> void assertThatAggregationsAreCorrect(A aggregationContainer) {
Aggregation aggregation = ((ElasticsearchAggregation) aggregationContainer).aggregation();
assertThat(aggregation.getName()).isEqualTo("messages");
Aggregate aggregate = aggregation.getAggregate();
assertThat(aggregate.isSterms()).isTrue();
StringTermsAggregate parsedStringTerms = (StringTermsAggregate) aggregate.sterms();
Buckets<StringTermsBucket> buckets = parsedStringTerms.buckets();
assertThat(buckets.isArray()).isTrue();
List<StringTermsBucket> bucketList = buckets.array();
assertThat(bucketList.size()).isEqualTo(3);
AtomicInteger count = new AtomicInteger();
bucketList.forEach(stringTermsBucket -> {
if ("message".equals(stringTermsBucket.key())) {
count.getAndIncrement();
assertThat(stringTermsBucket.docCount()).isEqualTo(3);
}
if ("some".equals(stringTermsBucket.key())) {
count.getAndIncrement();
assertThat(stringTermsBucket.docCount()).isEqualTo(2);
}
if ("other".equals(stringTermsBucket.key())) {
count.getAndIncrement();
assertThat(stringTermsBucket.docCount()).isEqualTo(1);
}
});
assertThat(count.get()).isEqualTo(3);
}
}

View File

@ -15,11 +15,23 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -37,4 +49,55 @@ public class ReactiveElasticsearchERHLCIntegrationTests extends ReactiveElastics
return new IndexNameProvider("reactive-template-es7");
}
}
@Override
protected Query getTermsAggsQuery(String aggsName, String aggsField) {
return new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.addAggregation(AggregationBuilders.terms("messages").field("message")).build();
}
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return new NativeSearchQueryBuilder().withQuery(matchAllQuery());
}
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value) {
return new NativeSearchQueryBuilder().withQuery(termQuery(field, value));
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseField);
if (innerHits != null) {
InnerHitBuilder innerHitBuilder = new InnerHitBuilder(innerHits);
if (size != null) {
innerHitBuilder.setSize(size);
}
collapseBuilder.setInnerHits(innerHitBuilder);
}
return new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withCollapseBuilder(collapseBuilder).build();
}
@Override
protected Query queryWithIds(String... ids) {
return new NativeSearchQueryBuilder().withIds(ids).build();
}
@Override
protected <A extends AggregationContainer<?>> void assertThatAggregationsAreCorrect(A aggregationContainer) {
Aggregation aggregation = (Aggregation) aggregationContainer.aggregation();
assertThat(aggregation.getName()).isEqualTo("messages");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("message").getDocCount()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("some").getDocCount()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("other").getDocCount()).isEqualTo(1);
}
}

View File

@ -24,9 +24,9 @@ import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.lang.Boolean;
import java.lang.Integer;
import java.lang.Long;
import java.lang.Object;
import java.net.ConnectException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@ -40,36 +40,26 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.json.JSONException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;
import org.skyscreamer.jsonassert.JSONAssert;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.NewElasticsearchClientDevelopment;
import org.springframework.data.elasticsearch.RestStatusException;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.document.Explanation;
import org.springframework.data.elasticsearch.core.index.AliasAction;
import org.springframework.data.elasticsearch.core.index.AliasActionParameters;
@ -98,7 +88,7 @@ import org.springframework.util.StringUtils;
*/
@SuppressWarnings("SpringJavaAutowiredMembersInspection")
@SpringIntegrationTest
public abstract class ReactiveElasticsearchIntegrationTests implements NewElasticsearchClientDevelopment {
public abstract class ReactiveElasticsearchIntegrationTests {
@Autowired private ReactiveElasticsearchOperations operations;
@Autowired private IndexNameProvider indexNameProvider;
@ -118,28 +108,18 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}
// endregion
protected abstract Query getTermsAggsQuery(String aggsName, String aggsField);
protected abstract BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery();
protected abstract BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value);
protected abstract Query getQueryWithCollapse(String collapseField, @Nullable String innerHits,
@Nullable Integer size);
protected abstract Query queryWithIds(String... ids);
// region Tests
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void executeShouldProvideResource() {
Mono.from(operations.execute(ReactiveElasticsearchClient::ping)) //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void executeShouldConvertExceptions() {
Mono.from(operations.execute(client -> {
throw new RuntimeException(new ConnectException("we're doomed"));
})) //
.as(StepVerifier::create) //
.expectError(DataAccessResourceFailureException.class) //
.verify();
}
@Test // DATAES-504
public void insertWithIdShouldWork() {
@ -157,7 +137,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void insertWithAutogeneratedIdShouldUpdateEntityId() {
@ -175,7 +154,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
return operations.exists(id, IndexCoordinates.of(index));
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void insertWithExplicitIndexNameShouldOverwriteMetadata() {
@ -290,7 +268,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-519
public void existsShouldReturnFalseWhenIndexDoesNotExist() {
@ -300,7 +277,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void existsShouldReturnTrueWhenFound() {
@ -313,7 +289,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void existsShouldReturnFalseWhenNotFound() {
@ -432,7 +407,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-595, DATAES-767
public void shouldThrowDataAccessExceptionWhenInvalidPreferenceForGivenCriteria() {
@ -445,6 +419,8 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
CriteriaQuery queryWithInvalidPreference = new CriteriaQuery(
new Criteria("message").contains("some").and("message").contains("message"));
queryWithInvalidPreference.setPreference("_only_nodes:oops");
// add a pageable to not use scrolling,otherwise the exception class does not match
queryWithInvalidPreference.setPageable(PageRequest.of(0, 10));
operations.search(queryWithInvalidPreference, SampleEntity.class) //
.as(StepVerifier::create) //
@ -498,7 +474,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-567
public void aggregateShouldReturnAggregations() {
@ -508,24 +483,17 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
index(sampleEntity1, sampleEntity2, sampleEntity3);
NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.addAggregation(AggregationBuilders.terms("messages").field("message")).build();
Query query = getTermsAggsQuery("messages", "message");
operations.aggregate(query, SampleEntity.class) //
.as(StepVerifier::create) //
.consumeNextWith(aggregationContainer -> {
Aggregation aggregation = ((ElasticsearchAggregation) aggregationContainer).aggregation();
assertThat(aggregation.getName()).isEqualTo("messages");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("message").getDocCount()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("some").getDocCount()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("other").getDocCount()).isEqualTo(1);
assertThatAggregationsAreCorrect(aggregationContainer);
}).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
protected abstract <A extends AggregationContainer<?>> void assertThatAggregationsAreCorrect(A aggregationContainer);
@Test // DATAES-567, DATAES-767
public void aggregateShouldErrorWhenIndexDoesNotExist() {
operations
@ -621,7 +589,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-519
public void deleteByQueryShouldReturnZeroWhenIndexDoesNotExist() {
@ -634,7 +601,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-547
public void shouldDeleteAcrossIndex() {
@ -650,11 +616,9 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).refresh().then(operations.indexOps(thatIndex).refresh()).block();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() //
.withQuery(termQuery("message", "test")) //
.build();
Query query = getBuilderWithTermQuery("message", "test").build();
operations.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
operations.delete(query, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(2L) //
@ -663,7 +627,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).delete().then(operations.indexOps(thatIndex).delete()).block();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-547
public void shouldDeleteAcrossIndexWhenNoMatchingDataPresent() {
@ -679,11 +642,9 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).refresh().then(operations.indexOps(thatIndex).refresh()).block();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() //
.withQuery(termQuery("message", "negative")) //
.build();
Query query = getBuilderWithTermQuery("message", "negative").build();
operations.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
operations.delete(query, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(0L) //
@ -692,7 +653,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).delete().then(operations.indexOps(thatIndex).delete()).block();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void deleteByQueryShouldReturnNumberOfDeletedDocuments() {
@ -707,7 +667,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void deleteByQueryShouldReturnZeroIfNothingDeleted() {
@ -722,7 +681,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-593
public void shouldReturnDocumentWithCollapsedField() {
@ -734,28 +692,20 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
entity3.setRate(1);
index(entity1, entity2, entity3);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(matchAllQuery()) //
.withCollapseField("rate") //
.withPageable(PageRequest.of(0, 25)) //
.build();
Query query = getQueryWithCollapse("rate", null, null);
operations.search(query, SampleEntity.class, IndexCoordinates.of(indexNameProvider.indexName())) //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
void shouldReturnSortFields() {
SampleEntity entity = randomEntity("test message");
entity.rate = 42;
index(entity);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(matchAllQuery()) //
.withSort(new FieldSortBuilder("rate").order(SortOrder.DESC)) //
Query query = getBuilderWithMatchAllQuery().withSort(Sort.by(Sort.Direction.DESC, "rate"))
.build();
operations.search(query, SampleEntity.class) //
@ -763,12 +713,23 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.consumeNextWith(it -> {
List<Object> sortValues = it.getSortValues();
assertThat(sortValues).hasSize(1);
assertThat(sortValues.get(0)).isEqualTo(42);
// old client returns Integer, new ElasticsearchClient String
java.lang.Object o = sortValues.get(0);
if (o instanceof Integer) {
Integer i = (Integer) o;
assertThat(o).isInstanceOf(Integer.class).isEqualTo(42);
} else if (o instanceof Long) {
Long l = (Long) o;
assertThat(o).isInstanceOf(Long.class).isEqualTo(42L);
} else if (o instanceof String) {
assertThat(o).isInstanceOf(String.class).isEqualTo("42");
} else {
fail("unexpected object type " + o);
}
}) //
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623, #1678
public void shouldReturnObjectsForGivenIdsUsingMultiGet() {
SampleEntity entity1 = randomEntity("test message 1");
@ -788,7 +749,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623
public void shouldReturnObjectsForGivenIdsUsingMultiGetWithFields() {
SampleEntity entity1 = randomEntity("test message 1");
@ -809,7 +769,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623. #1678
public void shouldDoBulkUpdate() {
SampleEntity entity1 = randomEntity("test message 1");
@ -847,7 +806,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623
void shouldSaveAll() {
SampleEntity entity1 = randomEntity("test message 1");
@ -858,7 +816,7 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.saveAll(Mono.just(Arrays.asList(entity1, entity2)), IndexCoordinates.of(indexNameProvider.indexName())) //
.then().block();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
Query searchQuery = operations.matchAllQuery();
operations.search(searchQuery, SampleEntity.class, IndexCoordinates.of(indexNameProvider.indexName())) //
.as(StepVerifier::create) //
.expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) //
@ -891,7 +849,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
assertThat(retrieved.seqNoPrimaryTerm.getPrimaryTerm()).isPositive();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799, #1678
void multiGetShouldReturnSeqNoPrimaryTerm() {
OptimisticEntity original = new OptimisticEntity();
@ -910,7 +867,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
return new NativeSearchQueryBuilder().withIds(singletonList(id)).build();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void searchShouldReturnSeqNoPrimaryTerm() {
OptimisticEntity original = new OptimisticEntity();
@ -927,10 +883,9 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}
private Query searchQueryForOne(String id) {
return new NativeSearchQueryBuilder().withFilter(new IdsQueryBuilder().addIds(id)).build();
return queryWithIds(id);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnEntityWithSeqNoPrimaryTermProperty() {
OptimisticEntity original = new OptimisticEntity();
@ -950,7 +905,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verify();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnVersionedEntityWithSeqNoPrimaryTermProperty() {
OptimisticAndVersionedEntity original = new OptimisticAndVersionedEntity();
@ -979,7 +933,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.save(forEdit).as(StepVerifier::create).expectNextCount(1).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-909
void shouldDoUpdate() {
SampleEntity entity = randomEntity("test message");
@ -1119,7 +1072,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1646, #1718
@DisplayName("should return a list of info for specific index")
void shouldReturnInformationListOfAllIndices() {

View File

@ -116,6 +116,30 @@ class RequestFactoryTests {
assertThat(searchRequest.source().from()).isEqualTo(30);
}
@Test // DATAES-227
public void shouldUseUpsertOnUpdate() {
// given
Map<String, Object> doc = new HashMap<>();
doc.put("id", "1");
doc.put("message", "test");
org.springframework.data.elasticsearch.core.document.Document document = org.springframework.data.elasticsearch.core.document.Document
.from(doc);
UpdateQuery updateQuery = UpdateQuery.builder("1") //
.withDocument(document) //
.withUpsert(document) //
.build();
// when
UpdateRequest request = requestFactory.updateRequest(updateQuery, IndexCoordinates.of("index"));
// then
assertThat(request).isNotNull();
assertThat(request.upsertRequest()).isNotNull();
}
@Test // DATAES-693
public void shouldReturnSourceWhenRequested() {
// given

View File

@ -21,11 +21,15 @@ import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.aggregations.StatsBucketAggregate;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.ELCQueries;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregation;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregations;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.QueryBuilders;
@ -55,18 +59,17 @@ public class AggregationELCIntegrationTests extends AggregationIntegrationTests
@Override
protected Query getTermsAggsQuery(String aggsName, String aggsField) {
return NativeQuery.builder() //
.withQuery(QueryBuilders.matchAllQueryAsQuery()) //
.withAggregation(aggsName, Aggregation.of(a -> a //
.terms(ta -> ta.field(aggsField)))) //
.withMaxResults(0) //
.build();
return ELCQueries.getTermsAggsQuery(aggsName, aggsField);
}
@Override
protected void assertThatAggsHasResult(AggregationsContainer<?> aggregationsContainer, String aggsName) {
Map<String, Aggregate> aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
assertThat(aggregations).containsKey(aggsName);
List<ElasticsearchAggregation> aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
List<String> aggNames = aggregations.stream() //
.map(ElasticsearchAggregation::aggregation) //
.map(org.springframework.data.elasticsearch.client.elc.Aggregation::getName) //
.collect(Collectors.toList());
assertThat(aggNames).contains(aggsName);
}
@Override
@ -84,9 +87,13 @@ public class AggregationELCIntegrationTests extends AggregationIntegrationTests
@Override
protected void assertThatPipelineAggsAreCorrect(AggregationsContainer<?> aggregationsContainer, String aggsName,
String pipelineAggsName) {
Map<String, Aggregate> aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
assertThat(aggregations).containsKey(aggsName);
Aggregate aggregate = aggregations.get(pipelineAggsName);
Map<String, Aggregate> aggregates = ((ElasticsearchAggregations) aggregationsContainer).aggregations().stream() //
.map(ElasticsearchAggregation::aggregation) //
.collect(Collectors.toMap(org.springframework.data.elasticsearch.client.elc.Aggregation::getName,
org.springframework.data.elasticsearch.client.elc.Aggregation::getAggregate));
assertThat(aggregates).containsKey(aggsName);
Aggregate aggregate = aggregates.get(pipelineAggsName);
assertThat(aggregate.isStatsBucket()).isTrue();
StatsBucketAggregate statsBucketAggregate = aggregate.statsBucket();
assertThat(statsBucketAggregate.min()).isEqualTo(1.0);

View File

@ -234,7 +234,7 @@ public abstract class AggregationIntegrationTests {
*/
static class ArticleEntityBuilder {
private ArticleEntity result;
private final ArticleEntity result;
public ArticleEntityBuilder(String id) {
result = new ArticleEntity(id);

View File

@ -116,7 +116,7 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie
operations.bulkIndex(indexQueries, AnnotatedCompletionEntity.class);
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test
public void shouldFindSuggestionsForGivenCriteriaQueryUsingCompletionEntity() {
@ -148,7 +148,7 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie
operations.get("1", CompletionEntity.class);
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test
public void shouldFindSuggestionsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {
@ -172,7 +172,7 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie
assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin");
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test
public void shouldFindSuggestionsWithWeightsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {

View File

@ -67,7 +67,7 @@ public abstract class ReactiveSuggestIntegrationTests implements NewElasticsearc
operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete().block();
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test // #1302
@DisplayName("should find suggestions for given prefix completion")
void shouldFindSuggestionsForGivenPrefixCompletion() {

View File

@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
@ContextConfiguration(classes = {ElasticsearchRepositoryELCIntegrationTests.Config.class })
public class ElasticsearchRepositoryELCIntegrationTests extends ElasticsearchRepositoryIntegrationTests {
@Configuration
@Import({ElasticsearchTemplateConfiguration.class })
@EnableElasticsearchRepositories(basePackages = {"org.springframework.data.elasticsearch.repository.support" },
considerNestedRepositories = true)
static class Config {
@Bean
IndexNameProvider indexNameProvider() {
return new IndexNameProvider("repository");
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
@ContextConfiguration(classes = {ElasticsearchRepositoryERHLCIntegrationTests.Config.class })
public class ElasticsearchRepositoryERHLCIntegrationTests extends ElasticsearchRepositoryIntegrationTests {
@Configuration
@Import({ElasticsearchRestTemplateConfiguration.class })
@EnableElasticsearchRepositories(basePackages = {"org.springframework.data.elasticsearch.repository.support" },
considerNestedRepositories = true)
static class Config {
@Bean
IndexNameProvider indexNameProvider() {
return new IndexNameProvider("repository-es7");
}
}
}

View File

@ -17,12 +17,9 @@ package org.springframework.data.elasticsearch.repository.support;
import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.springframework.data.elasticsearch.annotations.FieldType.*;
import static org.springframework.data.elasticsearch.utils.IdGenerator.*;
import java.io.IOException;
import java.lang.Long;
import java.lang.Object;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -30,12 +27,10 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
@ -45,18 +40,16 @@ import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.elasticsearch.utils.IndexInitializer;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.data.util.StreamUtils;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Rizwan Idrees
@ -68,29 +61,22 @@ import org.springframework.test.context.ContextConfiguration;
* @author Murali Chevuri
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { SimpleElasticsearchRepositoryIntegrationTests.Config.class })
class SimpleElasticsearchRepositoryIntegrationTests {
@Configuration
@Import({ ElasticsearchRestTemplateConfiguration.class })
@EnableElasticsearchRepositories(basePackages = { "org.springframework.data.elasticsearch.repository.support" },
considerNestedRepositories = true)
static class Config {}
abstract class ElasticsearchRepositoryIntegrationTests {
@Autowired private SampleElasticsearchRepository repository;
@Autowired private ElasticsearchOperations operations;
private IndexOperations indexOperations;
@Autowired private IndexNameProvider indexNameProvider;
@BeforeEach
void before() {
indexOperations = operations.indexOps(SampleEntity.class);
IndexInitializer.init(indexOperations);
indexNameProvider.increment();
operations.indexOps(SampleEntity.class).createWithMapping();
}
@AfterEach
void after() {
indexOperations.delete();
@Test
@org.junit.jupiter.api.Order(Integer.MAX_VALUE)
public void cleanup() {
operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete();
}
@Test
@ -369,7 +355,7 @@ class SimpleElasticsearchRepositoryIntegrationTests {
repository.deleteAllById(Arrays.asList(id1, id3));
// then
assertThat(repository.findAll()).extracting(SampleEntity::getId).containsExactly(id2);
Assertions.assertThat(repository.findAll()).extracting(SampleEntity::getId).containsExactly(id2);
}
@Test
@ -539,8 +525,8 @@ class SimpleElasticsearchRepositoryIntegrationTests {
repository.deleteAll(sampleEntities);
// then
assertThat(repository.findById(documentId1)).isNotPresent();
assertThat(repository.findById(documentId2)).isNotPresent();
Assertions.assertThat(repository.findById(documentId1)).isNotPresent();
Assertions.assertThat(repository.findById(documentId2)).isNotPresent();
}
@Test
@ -677,14 +663,14 @@ class SimpleElasticsearchRepositoryIntegrationTests {
return sampleEntities;
}
@Document(indexName = "test-index-sample-simple-repository")
@Document(indexName = "#{@indexNameProvider.indexName()}")
static class SampleEntity {
@Nullable
@Id private String id;
@Nullable
@Field(type = Text, store = true, fielddata = true) private String type;
@Field(type = FieldType.Text, store = true, fielddata = true) private String type;
@Nullable
@Field(type = Text, store = true, fielddata = true) private String message;
@Field(type = FieldType.Text, store = true, fielddata = true) private String message;
@Nullable private int rate;
@Nullable private boolean available;
@Nullable

View File

@ -67,6 +67,7 @@ import org.springframework.test.context.ContextConfiguration;
* @author Peter-Josef Meisch
* @author Jens Schauder
*/
// todo #1973 test for both clients
@SpringIntegrationTest
@ContextConfiguration(classes = { SimpleReactiveElasticsearchRepositoryTests.Config.class })
class SimpleReactiveElasticsearchRepositoryTests {