Compare commits

...

11 Commits

Author SHA1 Message Date
Christoph Strobl
16dacbb63c
Release version 4.4 RC1 (2021.2.0).
See #2120
2022-04-19 11:10:55 +02:00
Christoph Strobl
12acddb86d
Prepare 4.4 RC1 (2021.2.0).
See #2120
2022-04-19 11:10:15 +02:00
Peter-Josef Meisch
8cef50347e
Add more implementations using the new client.
Original Pull Request #2136
See #1973
2022-04-13 22:12:02 +02:00
Peter-Josef Meisch
ea4d3f9f30
Upgrade to Elasticsearch 7.17.2.
Original Pull Request #2131
Closes #2130

(cherry picked from commit a60f3059e168ac48b6dc9f97d3d3b14ad1837903)
2022-04-02 21:15:17 +02:00
Peter-Josef Meisch
b9e2b13f21
Add info to readme about Elasticsearch versions.
Original Pull Request #2128
Closes #2127

(cherry picked from commit 3154c74f941a8a433299ad43619c8bb0cccaf2fe)
2022-04-02 18:22:54 +02:00
Peter-Josef Meisch
5549216db0
Default Refresh policy for ReactiveElasticsearchTemplate.
Original Pull Request #2124
Closes #2110

(cherry picked from commit acd7990fac8a07a3816c389849981fd176fd63ab)
2022-03-24 21:11:25 +01:00
Mark Paluch
a2cee9defd
Update Jenkinsfile according to Java 8 build pipeline configuration.
See #2120
2022-03-24 09:30:20 +01:00
Greg L. Turnquist
ea0ac3f7bc
After release cleanups.
See #2092
2022-03-21 10:20:33 -05:00
Greg L. Turnquist
49cb56ed0c
Prepare next development iteration.
See #2092
2022-03-21 10:20:31 -05:00
Greg L. Turnquist
e8f9f9f1e3
Release version 4.4 M4 (2021.2.0).
See #2092
2022-03-21 10:09:34 -05:00
Greg L. Turnquist
f91c9c443b
Prepare 4.4 M4 (2021.2.0).
See #2092
2022-03-21 10:07:09 -05:00
52 changed files with 1826 additions and 541 deletions

11
Jenkinsfile vendored
View File

@ -9,7 +9,7 @@ pipeline {
triggers {
pollSCM 'H/10 * * * *'
upstream(upstreamProjects: "spring-data-commons/main", threshold: hudson.model.Result.SUCCESS)
upstream(upstreamProjects: "spring-data-commons/2.7.x", threshold: hudson.model.Result.SUCCESS)
}
options {
@ -20,8 +20,9 @@ pipeline {
stages {
stage("test: baseline (main)") {
when {
beforeAgent(true)
anyOf {
branch 'main'
branch(pattern: "main|(\\d\\.\\d\\.x)", comparator: "REGEXP")
not { triggeredBy 'UpstreamCause' }
}
}
@ -50,8 +51,9 @@ pipeline {
stage("Test other configurations") {
when {
beforeAgent(true)
allOf {
branch 'main'
branch(pattern: "main|(\\d\\.\\d\\.x)", comparator: "REGEXP")
not { triggeredBy 'UpstreamCause' }
}
}
@ -108,8 +110,9 @@ pipeline {
stage('Release to artifactory') {
when {
beforeAgent(true)
anyOf {
branch 'main'
branch(pattern: "main|(\\d\\.\\d\\.x)", comparator: "REGEXP")
not { triggeredBy 'UpstreamCause' }
}
}

View File

@ -1,3 +1,4 @@
image:https://spring.io/badges/spring-data-elasticsearch/ga.svg[Spring Data Elasticsearch,link=https://projects.spring.io/spring-data-elasticsearch#quick-start] image:https://spring.io/badges/spring-data-elasticsearch/snapshot.svg[Spring Data Elasticsearch,link=https://projects.spring.io/spring-data-elasticsearch#quick-start]
= Spring Data for Elasticsearch image:https://jenkins.spring.io/buildStatus/icon?job=spring-data-elasticsearch%2Fmain&subject=Build[link=https://jenkins.spring.io/view/SpringData/job/spring-data-elasticsearch/] https://gitter.im/spring-projects/spring-data[image:https://badges.gitter.im/spring-projects/spring-data.svg[Gitter]]
@ -12,13 +13,35 @@ This project is lead and maintained by the community.
== Features
* Spring configuration support using Java based `@Configuration` classes or an XML namespace for a ES clients instances.
* `ElasticsearchRestTemplate` helper class that increases productivity performing common ES operations.
* `ElasticsearchOperations` class and implementations that increases productivity performing common ES operations.
Includes integrated object mapping between documents and POJOs.
* Feature Rich Object Mapping integrated with Springs Conversion Service
* Annotation based mapping metadata
* Automatic implementation of `Repository` interfaces including support for custom search methods.
* CDI support for repositories
== About Elasticsearch versions and clients
=== Elasticsearch 7.17 client libraries
At the end of 2021 Elasticsearch with version 7.17 released the new version of their Java client and deprecated the `RestHighLevelCLient` which was the default way to access Elasticsearch up to then.
Spring Data Elasticsearch will in version 4.4 offer the possibility to optionally use the new client as an alternative to the existing setup using the `RestHighLevelCLient`.
The default client that is used still is the `RestHighLevelCLient`, first because the integration of the new client is not yet complete, the new client still has features missing and bugs which will hopefully be resolved soon.
Second, and more important, the new Elasticsearch client forces users to switch from using `javax.json.spi.JsonProvider` to `jakarta.json.spi.JsonProvider`.
Spring Data Elasticsearch cannot enforce this switch; Spring Boot will switch to `jakarta` with version 3 and then it's safe for Spring Data Elasticsearch to switch to the new client.
So for version 4.4 Spring Data Elasticsearch will keep using the `RestHighLevelCLient` in version 7.17.x (as long as this will be available).
=== Elasticsearch 8 client libraries
In Elasticsearch 8, the `RestHighLevelCLient` has been removed.
This means that a switch to this client version can only be done with the next major upgrade which will be Spring Data Elasticsearch 5, based on Spring Data 3, used by Spring Boot 3, based on Spring 6 and Java 17.
=== Elasticsearch 8 cluster
It should be possible to use the Elasticsearch 7 client to access a cluster running version 8 by setting the appropriate aompatibility headers (see the documentation at https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#elasticsearch.clients.configuration). but I encountered and heard of cases where the response from the server is not parseable by the client although the headers are set, so use with care.
== Code of Conduct
This project is governed by the https://github.com/spring-projects/.github/blob/e3cc2ff230d8f1dca06535aa6b5a4a23815861d4/CODE_OF_CONDUCT.md[Spring Code of Conduct].

16
pom.xml
View File

@ -5,12 +5,12 @@
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.0-RC1</version>
<parent>
<groupId>org.springframework.data.build</groupId>
<artifactId>spring-data-parent</artifactId>
<version>2.7.0-SNAPSHOT</version>
<version>2.7.0-RC1</version>
</parent>
<name>Spring Data Elasticsearch</name>
@ -18,11 +18,11 @@
<url>https://github.com/spring-projects/spring-data-elasticsearch</url>
<properties>
<elasticsearch>7.17.1</elasticsearch>
<elasticsearch-java>7.17.1</elasticsearch-java>
<elasticsearch>7.17.2</elasticsearch>
<elasticsearch-java>7.17.2</elasticsearch-java>
<log4j>2.17.1</log4j>
<netty>4.1.65.Final</netty>
<springdata.commons>2.7.0-SNAPSHOT</springdata.commons>
<springdata.commons>2.7.0-RC1</springdata.commons>
<testcontainers>1.16.2</testcontainers>
<blockhound-junit>1.0.6.RELEASE</blockhound-junit>
<java-module-name>spring.data.elasticsearch</java-module-name>
@ -497,15 +497,15 @@
<repositories>
<repository>
<id>spring-libs-snapshot</id>
<url>https://repo.spring.io/libs-snapshot</url>
<id>spring-libs-milestone</id>
<url>https://repo.spring.io/libs-milestone</url>
</repository>
<repository>
<id>local-maven-repo</id>
<url>file:///${project.basedir}/src/test/resources/local-maven-repo</url>
</repository>
</repositories>
<pluginRepositories>

View File

@ -34,7 +34,7 @@ The following table shows the Elasticsearch versions that are used by Spring Dat
[cols="^,^,^,^,^",options="header"]
|===
| Spring Data Release Train | Spring Data Elasticsearch | Elasticsearch | Spring Framework | Spring Boot
| 2022.0 (Raj) | 4.4.x | 7.17.1 | 5.3.x | 2.7.x
| 2022.0 (Raj) | 4.4.x | 7.17.2 | 5.3.x | 2.7.x
| 2021.1 (Q) | 4.3.x | 7.15.2 | 5.3.x | 2.6.x
| 2021.0 (Pascal) | 4.2.x | 7.12.0 | 5.3.x | 2.5.x
| 2020.0 (Ockham)footnote:oom[Out of maintenance] | 4.1.xfootnote:oom[] | 7.9.3 | 5.3.2 | 2.4.x

View File

@ -28,6 +28,14 @@ Connections to Elasticsearch must be made using either the imperative `Elasticse
In 4.3 two classes (`ElasticsearchAggregations` and `ElasticsearchAggregation`) had been moved to the `org.springframework.data.elasticsearch.core.clients.elasticsearch7` package in preparation for the integration of the new Elasticsearch client.
The were moved back to the `org.springframework.data.elasticsearch.core` package as we keep the classes use the old Elasticsearch client where they were.
=== Behaviour change
The `ReactiveElasticsearchTemplate`, when created directly or by Spring Boot configuration had a default refresh policy of IMMEDIATE.
This could cause performance issues on heavy indexing and was different than the default behaviour of Elasticsearch.
This has been changed to that now the default refresh policy is NONE.
When the
`ReactiveElasticsearchTemplate` was provided by using the configuration like described in <<elasticsearch.clients.reactive>> the default refresh policy already was set to NONE.
[[elasticsearch-migration-guide-4.3-4.4.new-clients]]
== New Elasticsearch client
@ -74,7 +82,7 @@ The dependencies for the new Elasticsearch client are still optional in Spring D
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>7.17.1</version>
<version>7.17.2</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
@ -85,7 +93,7 @@ The dependencies for the new Elasticsearch client are still optional in Spring D
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId> <!-- is Apache 2-->
<version>7.17.1</version>
<version>7.17.2</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>

View File

@ -5,7 +5,7 @@
== New in Spring Data Elasticsearch 4.4
* Introduction of new imperative and reactive clients using the classes from the new Elasticsearch Java client
* Upgrade to Elasticsearch 7.17.1.
* Upgrade to Elasticsearch 7.17.2.
[[new-features.4-3-0]]
== New in Spring Data Elasticsearch 4.3

View File

@ -25,6 +25,14 @@ public class NoSuchIndexException extends NonTransientDataAccessResourceExceptio
private final String index;
/**
* @since 4.4
*/
public NoSuchIndexException(String index) {
super(String.format("Index %s not found.", index));
this.index = index;
}
public NoSuchIndexException(String index, Throwable cause) {
super(String.format("Index %s not found.", index), cause);
this.index = index;

View File

@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
/**
* Class to combine an Elasticsearch {@link co.elastic.clients.elasticsearch._types.aggregations.Aggregate} with its
* name. Necessary as the Elasticsearch Aggregate does not know i"s name.
*
* @author Peter-Josef Meisch
* @since 4.4
*/
public class Aggregation {
private final String name;
private final Aggregate aggregate;
public Aggregation(String name, Aggregate aggregate) {
this.name = name;
this.aggregate = aggregate;
}
public String getName() {
return name;
}
public Aggregate getAggregate() {
return aggregate;
}
}

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
@ -77,18 +78,12 @@ final class DocumentAdapters {
NestedMetaData nestedMetaData = from(hit.nested());
// todo #1973 explanation
Explanation explanation = from(hit.explanation());
// todo #1973 matchedQueries
List<String> matchedQueries = null;
// todo #1973 documentFields
Map<String, List<Object>> documentFields = Collections.emptyMap();
Document document;
Object source = hit.source();
if (source == null) {
// Elasticsearch provides raw JsonData, so we build the fields into a JSON string
Function<Map<String, JsonData>, EntityAsMap> fromFields = fields -> {
StringBuilder sb = new StringBuilder("{");
final boolean[] firstField = { true };
hit.fields().forEach((key, jsonData) -> {
@ -100,7 +95,25 @@ final class DocumentAdapters {
firstField[0] = false;
});
sb.append('}');
document = Document.parse(sb.toString());
return new EntityAsMap().fromJson(sb.toString());
};
EntityAsMap hitFieldsAsMap = fromFields.apply(hit.fields());
Map<String, List<Object>> documentFields = new LinkedHashMap<>();
hitFieldsAsMap.entrySet().forEach(entry -> {
if (entry.getValue() instanceof List) {
// noinspection unchecked
documentFields.put(entry.getKey(), (List<Object>) entry.getValue());
} else {
documentFields.put(entry.getKey(), Collections.singletonList(entry.getValue()));
}
});
Document document;
Object source = hit.source();
if (source == null) {
document = Document.from(hitFieldsAsMap);
} else {
if (source instanceof EntityAsMap) {
document = Document.from((EntityAsMap) source);

View File

@ -0,0 +1,37 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.client.elc;
import org.springframework.data.elasticsearch.core.AggregationContainer;
/**
* {@link AggregationContainer} for a {@link Aggregation} that holds Elasticsearch data.
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ElasticsearchAggregation implements AggregationContainer<Aggregation> {
private final Aggregation aggregation;
public ElasticsearchAggregation(Aggregation aggregation) {
this.aggregation = aggregation;
}
@Override
public Aggregation aggregation() {
return aggregation;
}
}

View File

@ -17,9 +17,12 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.springframework.data.elasticsearch.core.AggregationsContainer;
import org.springframework.util.Assert;
/**
* AggregationsContainer implementation for the Elasticsearch aggregations.
@ -27,16 +30,33 @@ import org.springframework.data.elasticsearch.core.AggregationsContainer;
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ElasticsearchAggregations implements AggregationsContainer<Map<String, Aggregate>> {
public class ElasticsearchAggregations implements AggregationsContainer<List<ElasticsearchAggregation>> {
private final Map<String, Aggregate> aggregations;
private final List<ElasticsearchAggregation> aggregations;
public ElasticsearchAggregations(List<ElasticsearchAggregation> aggregations) {
Assert.notNull(aggregations, "aggregations must not be null");
public ElasticsearchAggregations(Map<String, Aggregate> aggregations) {
this.aggregations = aggregations;
}
/**
* convenience constructor taking a map as it is returned from the new Elasticsearch client.
*
* @param aggregationsMap aggregate map
*/
public ElasticsearchAggregations(Map<String, Aggregate> aggregationsMap) {
Assert.notNull(aggregationsMap, "aggregationsMap must not be null");
aggregations = new ArrayList<>(aggregationsMap.size());
aggregationsMap
.forEach((name, aggregate) -> aggregations.add(new ElasticsearchAggregation(new Aggregation(name, aggregate))));
}
@Override
public Map<String, Aggregate> aggregations() {
public List<ElasticsearchAggregation> aggregations() {
return aggregations;
}
}

View File

@ -20,13 +20,18 @@ import co.elastic.clients.elasticsearch._types.ErrorResponse;
import co.elastic.clients.json.JsonpMapper;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.elasticsearch.client.ResponseException;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.elasticsearch.RestStatusException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.http.HttpStatus;
/**
* Simple {@link PersistenceExceptionTranslator} for Elasticsearch. Convert the given runtime exception to an
@ -67,14 +72,28 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra
return new OptimisticLockingFailureException("Cannot index a document due to seq_no+primary_term conflict", ex);
}
// todo #1973 index unavailable?
if (ex instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) ex;
ErrorResponse response = elasticsearchException.response();
if (response.status() == HttpStatus.NOT_FOUND.value()
&& "index_not_found_exception".equals(response.error().type())) {
Pattern pattern = Pattern.compile(".*no such index \\[(.*)\\]");
String index = "";
Matcher matcher = pattern.matcher(response.error().reason());
if (matcher.matches()) {
index = matcher.group(1);
}
return new NoSuchIndexException(index);
}
String body = JsonUtils.toJson(response, jsonpMapper);
if (response.error().type().contains("validation_exception")) {
return new DataIntegrityViolationException(response.error().reason());
}
return new UncategorizedElasticsearchException(ex.getMessage(), response.status(), body, ex);
}
@ -86,20 +105,22 @@ public class ElasticsearchExceptionTranslator implements PersistenceExceptionTra
return null;
}
private boolean isSeqNoConflict(Exception exception) {
private boolean isSeqNoConflict(Throwable exception) {
// todo #1973 check if this works
Integer status = null;
String message = null;
if (exception instanceof RestStatusException) {
RestStatusException statusException = (RestStatusException) exception;
status = statusException.getStatus();
message = statusException.getMessage();
if (exception instanceof ResponseException) {
ResponseException responseException = (ResponseException) exception;
status = responseException.getResponse().getStatusLine().getStatusCode();
message = responseException.getMessage();
} else if (exception.getCause() != null) {
return isSeqNoConflict(exception.getCause());
}
if (status != null && message != null) {
return status == 409 && message.contains("type=version_conflict_engine_exception")
return status == 409 && message.contains("type\":\"version_conflict_engine_exception")
&& message.contains("version conflict, required seqNo");
}

View File

@ -15,10 +15,13 @@
*/
package org.springframework.data.elasticsearch.client.elc;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.msearch.MultiSearchResponseItem;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
@ -40,6 +43,7 @@ import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
@ -137,7 +141,12 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
@Override
public void bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(queries, "queries must not be null");
Assert.notNull(bulkOptions, "bulkOptions must not be null");
Assert.notNull(index, "index must not be null");
doBulkOperation(queries, bulkOptions, index);
}
@Override
@ -155,12 +164,25 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
@Override
public UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
routingResolver.getRouting());
co.elastic.clients.elasticsearch.core.UpdateResponse<Document> response = execute(
client -> client.update(request, Document.class));
return UpdateResponse.of(result(response.result()));
}
@Override
public ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(updateQuery, "updateQuery must not be null");
Assert.notNull(index, "index must not be null");
UpdateByQueryRequest request = requestConverter.documentUpdateByQueryRequest(updateQuery, index,
getRefreshPolicy());
UpdateByQueryResponse byQueryResponse = execute(client -> client.updateByQuery(request));
return responseConverter.byQueryResponse(byQueryResponse);
}
@Override
@ -404,14 +426,52 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
return doMultiSearch(multiSearchQueryParameters);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private List<SearchHits<?>> doMultiSearch(List<MultiSearchQueryParameter> multiSearchQueryParameters) {
throw new UnsupportedOperationException("not implemented");
MsearchRequest request = requestConverter.searchMsearchRequest(multiSearchQueryParameters);
MsearchResponse<EntityAsMap> msearchResponse = execute(client -> client.msearch(request, EntityAsMap.class));
List<MultiSearchResponseItem<EntityAsMap>> responseItems = msearchResponse.responses();
Assert.isTrue(multiSearchQueryParameters.size() == responseItems.size(),
"number of response items does not match number of requests");
List<SearchHits<?>> searchHitsList = new ArrayList<>(multiSearchQueryParameters.size());
Iterator<MultiSearchQueryParameter> queryIterator = multiSearchQueryParameters.iterator();
Iterator<MultiSearchResponseItem<EntityAsMap>> responseIterator = responseItems.iterator();
while (queryIterator.hasNext()) {
MultiSearchQueryParameter queryParameter = queryIterator.next();
MultiSearchResponseItem<EntityAsMap> responseItem = responseIterator.next();
// if responseItem kind is Result then responsItem.value is a MultiSearchItem which is derived from SearchResponse
if (responseItem.isResult()) {
Class clazz = queryParameter.clazz;
ReadDocumentCallback<?> documentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz,
queryParameter.index);
SearchDocumentResponseCallback<SearchHits<?>> callback = new ReadSearchDocumentResponseCallback<>(clazz,
queryParameter.index);
SearchHits<?> searchHits = callback.doWith(
SearchDocumentResponseBuilder.from(responseItem.result(), getEntityCreator(documentCallback), jsonpMapper));
searchHitsList.add(searchHits);
} else {
// todo #1973 add failure
}
}
return searchHitsList;
}
/**
* value class combining the information needed for a single query in a multisearch request.
*/
private static class MultiSearchQueryParameter {
static class MultiSearchQueryParameter {
final Query query;
final Class<?> clazz;
final IndexCoordinates index;

View File

@ -17,14 +17,18 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import co.elastic.clients.elasticsearch.core.search.Suggester;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.data.elasticsearch.core.query.BaseQuery;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptedField;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* A {@link org.springframework.data.elasticsearch.core.query.Query} implementation using query builders from the new
@ -36,13 +40,23 @@ import org.springframework.util.Assert;
public class NativeQuery extends BaseQuery {
@Nullable private final Query query;
@Nullable private Query filter;
// note: the new client does not have pipeline aggs, these are just set up as normal aggs
private final Map<String, Aggregation> aggregations = new LinkedHashMap<>();
@Nullable private Suggester suggester;
@Nullable private FieldCollapse fieldCollapse;
private List<ScriptedField> scriptedFields = Collections.emptyList();
private List<RescorerQuery> rescorerQueries = Collections.emptyList();
public NativeQuery(NativeQueryBuilder builder) {
super(builder);
this.query = builder.getQuery();
this.filter = builder.getFilter();
this.aggregations.putAll(builder.getAggregations());
this.suggester = builder.getSuggester();
this.fieldCollapse = builder.getFieldCollapse();
this.scriptedFields = builder.getScriptedFields();
this.rescorerQueries = builder.getRescorerQueries();
}
public NativeQuery(@Nullable Query query) {
@ -58,20 +72,9 @@ public class NativeQuery extends BaseQuery {
return query;
}
public void addAggregation(String name, Aggregation aggregation) {
Assert.notNull(name, "name must not be null");
Assert.notNull(aggregation, "aggregation must not be null");
aggregations.put(name, aggregation);
}
public void setAggregations(Map<String, Aggregation> aggregations) {
Assert.notNull(aggregations, "aggregations must not be null");
this.aggregations.clear();
this.aggregations.putAll(aggregations);
@Nullable
public Query getFilter() {
return filter;
}
public Map<String, Aggregation> getAggregations() {
@ -83,8 +86,17 @@ public class NativeQuery extends BaseQuery {
return suggester;
}
public void setSuggester(@Nullable Suggester suggester) {
this.suggester = suggester;
@Nullable
public FieldCollapse getFieldCollapse() {
return fieldCollapse;
}
public List<ScriptedField> getScriptedFields() {
return scriptedFields;
}
@Override
public List<RescorerQuery> getRescorerQueries() {
return rescorerQueries;
}
}

View File

@ -17,14 +17,19 @@ package org.springframework.data.elasticsearch.client.elc;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import co.elastic.clients.elasticsearch.core.search.Suggester;
import co.elastic.clients.util.ObjectBuilder;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptedField;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -35,17 +40,47 @@ import org.springframework.util.Assert;
public class NativeQueryBuilder extends BaseQueryBuilder<NativeQuery, NativeQueryBuilder> {
@Nullable private Query query;
@Nullable private Query filter;
private final Map<String, Aggregation> aggregations = new LinkedHashMap<>();
@Nullable private Suggester suggester;
@Nullable private FieldCollapse fieldCollapse;
private final List<ScriptedField> scriptedFields = new ArrayList<>();
private List<RescorerQuery> rescorerQueries = new ArrayList<>();
public NativeQueryBuilder() {
}
public NativeQueryBuilder() {}
@Nullable
public Query getQuery() {
return query;
}
@Nullable
public Query getFilter() {
return this.filter;
}
public Map<String, Aggregation> getAggregations() {
return aggregations;
}
@Nullable
public Suggester getSuggester() {
return suggester;
}
@Nullable
public FieldCollapse getFieldCollapse() {
return fieldCollapse;
}
public List<ScriptedField> getScriptedFields() {
return scriptedFields;
}
public List<RescorerQuery> getRescorerQueries() {
return rescorerQueries;
}
public NativeQueryBuilder withQuery(Query query) {
Assert.notNull(query, "query must not be null");
@ -54,6 +89,11 @@ public class NativeQueryBuilder extends BaseQueryBuilder<NativeQuery, NativeQuer
return this;
}
public NativeQueryBuilder withFilter(@Nullable Query filter) {
this.filter = filter;
return this;
}
public NativeQueryBuilder withQuery(Function<Query.Builder, ObjectBuilder<Query>> fn) {
Assert.notNull(fn, "fn must not be null");
@ -75,11 +115,28 @@ public class NativeQueryBuilder extends BaseQueryBuilder<NativeQuery, NativeQuer
return this;
}
public NativeQuery build() {
NativeQuery nativeQuery = new NativeQuery(this);
nativeQuery.setAggregations(aggregations);
nativeQuery.setSuggester(suggester);
public NativeQueryBuilder withFieldCollapse(@Nullable FieldCollapse fieldCollapse) {
this.fieldCollapse = fieldCollapse;
return this;
}
return nativeQuery;
public NativeQueryBuilder withScriptedField(ScriptedField scriptedField) {
Assert.notNull(scriptedField, "scriptedField must not be null");
this.scriptedFields.add(scriptedField);
return this;
}
public NativeQueryBuilder withResorerQuery(RescorerQuery resorerQuery) {
Assert.notNull(resorerQuery, "resorerQuery must not be null");
this.rescorerQueries.add(resorerQuery);
return this;
}
public NativeQuery build() {
return new NativeQuery(this);
}
}

View File

@ -127,12 +127,51 @@ public class ReactiveElasticsearchClient extends ApiClient<ElasticsearchTranspor
return Mono.fromFuture(transport.performRequestAsync(request, endpoint, transportOptions));
}
public <T, P> Mono<UpdateResponse<T>> update(UpdateRequest<T, P> request, Class<T> clazz) {
Assert.notNull(request, "request must not be null");
// noinspection unchecked
JsonEndpoint<UpdateRequest<?, ?>, UpdateResponse<T>, ErrorResponse> endpoint = new EndpointWithResponseMapperAttr(
UpdateRequest._ENDPOINT, "co.elastic.clients:Deserializer:_global.update.TDocument",
this.getDeserializer(clazz));
return Mono.fromFuture(transport.performRequestAsync(request, endpoint, this.transportOptions));
}
public <T, P> Mono<UpdateResponse<T>> update(
Function<UpdateRequest.Builder<T, P>, ObjectBuilder<UpdateRequest<T, P>>> fn, Class<T> clazz) {
Assert.notNull(fn, "fn must not be null");
return update(fn.apply(new UpdateRequest.Builder<>()).build(), clazz);
}
public <T> Mono<GetResponse<T>> get(Function<GetRequest.Builder, ObjectBuilder<GetRequest>> fn, Class<T> tClass) {
Assert.notNull(fn, "fn must not be null");
return get(fn.apply(new GetRequest.Builder()).build(), tClass);
}
public <T> Mono<MgetResponse<T>> mget(MgetRequest request, Class<T> clazz) {
Assert.notNull(request, "request must not be null");
Assert.notNull(clazz, "clazz must not be null");
// noinspection unchecked
JsonEndpoint<MgetRequest, MgetResponse<T>, ErrorResponse> endpoint = (JsonEndpoint<MgetRequest, MgetResponse<T>, ErrorResponse>) MgetRequest._ENDPOINT;
endpoint = new EndpointWithResponseMapperAttr<>(endpoint, "co.elastic.clients:Deserializer:_global.mget.TDocument",
this.getDeserializer(clazz));
return Mono.fromFuture(transport.performRequestAsync(request, endpoint, transportOptions));
}
public <T> Mono<MgetResponse<T>> mget(Function<MgetRequest.Builder, ObjectBuilder<MgetRequest>> fn, Class<T> clazz) {
Assert.notNull(fn, "fn must not be null");
return mget(fn.apply(new MgetRequest.Builder()).build(), clazz);
}
public Mono<ReindexResponse> reindex(ReindexRequest request) {
Assert.notNull(request, "request must not be null");
@ -161,6 +200,21 @@ public class ReactiveElasticsearchClient extends ApiClient<ElasticsearchTranspor
return delete(fn.apply(new DeleteRequest.Builder()).build());
}
public Mono<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest request) {
Assert.notNull(request, "request must not be null");
return Mono.fromFuture(transport.performRequestAsync(request, DeleteByQueryRequest._ENDPOINT, transportOptions));
}
public Mono<DeleteByQueryResponse> deleteByQuery(
Function<DeleteByQueryRequest.Builder, ObjectBuilder<DeleteByQueryRequest>> fn) {
Assert.notNull(fn, "fn must not be null");
return deleteByQuery(fn.apply(new DeleteByQueryRequest.Builder()).build());
}
// endregion
// region search

View File

@ -212,14 +212,6 @@ public class ReactiveElasticsearchIndicesClient
return existsTemplate(fn.apply(new ExistsTemplateRequest.Builder()).build());
}
public Mono<BooleanResponse> existsType(ExistsTypeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, ExistsTypeRequest._ENDPOINT, transportOptions));
}
public Mono<BooleanResponse> existsType(Function<ExistsTypeRequest.Builder, ObjectBuilder<ExistsTypeRequest>> fn) {
return existsType(fn.apply(new ExistsTypeRequest.Builder()).build());
}
public Mono<FlushResponse> flush(FlushRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, FlushRequest._ENDPOINT, transportOptions));
}
@ -232,19 +224,6 @@ public class ReactiveElasticsearchIndicesClient
return flush(builder -> builder);
}
public Mono<FlushSyncedResponse> flushSynced(FlushSyncedRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, FlushSyncedRequest._ENDPOINT, transportOptions));
}
public Mono<FlushSyncedResponse> flushSynced(
Function<FlushSyncedRequest.Builder, ObjectBuilder<FlushSyncedRequest>> fn) {
return flushSynced(fn.apply(new FlushSyncedRequest.Builder()).build());
}
public Mono<FlushSyncedResponse> flushSynced() {
return flushSynced(builder -> builder);
}
@SuppressWarnings("SpellCheckingInspection")
public Mono<ForcemergeResponse> forcemerge(ForcemergeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, ForcemergeRequest._ENDPOINT, transportOptions));
@ -260,14 +239,6 @@ public class ReactiveElasticsearchIndicesClient
return forcemerge(builder -> builder);
}
public Mono<FreezeResponse> freeze(FreezeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, FreezeRequest._ENDPOINT, transportOptions));
}
public Mono<FreezeResponse> freeze(Function<FreezeRequest.Builder, ObjectBuilder<FreezeRequest>> fn) {
return freeze(fn.apply(new FreezeRequest.Builder()).build());
}
public Mono<GetIndexResponse> get(GetIndexRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, GetIndexRequest._ENDPOINT, transportOptions));
}
@ -363,18 +334,6 @@ public class ReactiveElasticsearchIndicesClient
return getTemplate(builder -> builder);
}
public Mono<GetUpgradeResponse> getUpgrade(GetUpgradeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, GetUpgradeRequest._ENDPOINT, transportOptions));
}
public Mono<GetUpgradeResponse> getUpgrade(Function<GetUpgradeRequest.Builder, ObjectBuilder<GetUpgradeRequest>> fn) {
return getUpgrade(fn.apply(new GetUpgradeRequest.Builder()).build());
}
public Mono<GetUpgradeResponse> getUpgrade() {
return getUpgrade(builder -> builder);
}
public Mono<MigrateToDataStreamResponse> migrateToDataStream(MigrateToDataStreamRequest request) {
return Mono
.fromFuture(transport.performRequestAsync(request, MigrateToDataStreamRequest._ENDPOINT, transportOptions));
@ -601,18 +560,6 @@ public class ReactiveElasticsearchIndicesClient
return updateAliases(builder -> builder);
}
public Mono<UpgradeResponse> upgrade(UpgradeRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, UpgradeRequest._ENDPOINT, transportOptions));
}
public Mono<UpgradeResponse> upgrade(Function<UpgradeRequest.Builder, ObjectBuilder<UpgradeRequest>> fn) {
return upgrade(fn.apply(new UpgradeRequest.Builder()).build());
}
public Mono<UpgradeResponse> upgrade() {
return upgrade(builder -> builder);
}
public Mono<ValidateQueryResponse> validateQuery(ValidateQueryRequest request) {
return Mono.fromFuture(transport.performRequestAsync(request, ValidateQueryRequest._ENDPOINT, transportOptions));
}

View File

@ -15,10 +15,13 @@
*/
package org.springframework.data.elasticsearch.client.elc;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.get.GetResult;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.Version;
import reactor.core.publisher.Flux;
@ -46,6 +49,7 @@ import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperatio
import org.springframework.data.elasticsearch.core.ReactiveIndexOperations;
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -103,6 +107,34 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
)));
}
@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPublisher, IndexCoordinates index) {
Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!");
return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)//
.index() //
.flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkResponseItem response = indexAndResponse.getT2();
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.id(), response.seqNo(),
response.primaryTerm(), response.version()));
return maybeCallAfterSave(savedEntity, index);
});
});
}
@Override
public <T> Mono<T> get(String id, Class<T> entityType, IndexCoordinates index) {
@ -144,9 +176,9 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
return Mono.from(execute( //
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.ReindexResponse>>) client -> client
.reindex(reindexRequestES)))
.flatMap(response -> (response.task() == null)
? Mono.error( // todo #1973 check behaviour and create issue in ES if necessary
new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request"))
.flatMap(response -> (response.task() == null) ? Mono.error( // todo #1973 check behaviour and create issue in
// ES if necessary
new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request"))
: Mono.just(response.task()));
}
@ -170,7 +202,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
}
private <R> Mono<BulkResponse> checkForBulkOperationFailure(BulkResponse bulkResponse) {
private Mono<BulkResponse> checkForBulkOperationFailure(BulkResponse bulkResponse) {
if (bulkResponse.errors()) {
Map<String, String> failedDocuments = new HashMap<>();
@ -214,6 +246,31 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
}).onErrorResume(NoSuchIndexException.class, it -> Mono.empty());
}
@Override
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(clazz, "clazz must not be null");
MgetRequest request = requestConverter.documentMgetRequest(query, clazz, index);
ReadDocumentCallback<T> callback = new ReadDocumentCallback<>(converter, clazz, index);
Publisher<MgetResponse<EntityAsMap>> response = execute(
(ClientCallback<Publisher<MgetResponse<EntityAsMap>>>) client -> client.mget(request, EntityAsMap.class));
return Mono.from(response)//
.flatMapMany(it -> Flux.fromIterable(DocumentAdapters.from(it))) //
.flatMap(multiGetItem -> {
if (multiGetItem.isFailed()) {
return Mono.just(MultiGetItem.of(null, multiGetItem.getFailure()));
} else {
return callback.toEntity(multiGetItem.getItem()) //
.map(t -> MultiGetItem.of(t, multiGetItem.getFailure()));
}
});
}
// endregion
@Override
@ -223,8 +280,30 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
@Override
protected Mono<Boolean> doExists(String id, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");
GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true);
return Mono.from(execute(
((ClientCallback<Publisher<GetResponse<EntityAsMap>>>) client -> client.get(getRequest, EntityAsMap.class))))
.map(GetResult::found) //
.onErrorReturn(NoSuchIndexException.class, false);
}
@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index,
getRefreshPolicy());
return Mono
.from(execute((ClientCallback<Publisher<DeleteByQueryResponse>>) client -> client.deleteByQuery(request)))
.map(responseConverter::byQueryResponse);
}
// region search operations
@Override
@ -307,7 +386,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
// noinspection unchecked
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<T>((Class<T>) clazz, index);
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>((Class<T>) clazz, index);
SearchDocumentResponse.EntityCreator<T> entityCreator = searchDocument -> callback.toEntity(searchDocument)
.toFuture();
@ -317,6 +396,15 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
.map(searchResponse -> SearchDocumentResponseBuilder.from(searchResponse, entityCreator, jsonpMapper));
}
@Override
public Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
return doFindForResponse(query, entityType, index).flatMapMany(searchDocumentResponse -> {
ElasticsearchAggregations aggregations = (ElasticsearchAggregations) searchDocumentResponse.getAggregations();
return aggregations == null ? Flux.empty() : Flux.fromIterable(aggregations.aggregations());
});
}
// endregion
@Override
@ -326,7 +414,7 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
@Override
protected Mono<String> getRuntimeLibraryVersion() {
return Mono.just(Version.VERSION.toString());
return Mono.just(Version.VERSION != null ? Version.VERSION.toString() : "null");
}
@Override
@ -334,47 +422,22 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
return Mono.from(execute(ReactiveElasticsearchClient::info)).map(infoResponse -> infoResponse.version().number());
}
@Override
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPublisher, IndexCoordinates index) {
Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!");
return entitiesPublisher //
.flatMapMany(entities -> Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index)) //
).collectList() //
.map(Entities::new) //
.flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)//
.index() //
.flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkResponseItem response = indexAndResponse.getT2();
updateIndexedObject(savedEntity, IndexedObjectInformation.of(response.id(), response.seqNo(),
response.primaryTerm(), response.version()));
return maybeCallAfterSave(savedEntity, index);
});
});
}
@Override
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
Assert.notNull(updateQuery, "UpdateQuery must not be null");
Assert.notNull(index, "Index must not be null");
UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
routingResolver.getRouting());
return Mono.from(execute(
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.UpdateResponse<Document>>>) client -> client
.update(request, Document.class)))
.flatMap(response -> {
UpdateResponse.Result result = result(response.result());
return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result));
});
}
@Override
@ -413,11 +476,6 @@ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearch
return new ReactiveClusterTemplate(client.cluster(), converter);
}
@Override
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public Flux<Suggest> suggest(SuggestBuilder suggestion, Class<?> entityType) {
throw new UnsupportedOperationException("not implemented");

View File

@ -297,7 +297,7 @@ public class ReactiveIndicesTemplate extends ReactiveChildTemplate<ReactiveElast
co.elastic.clients.elasticsearch.indices.ExistsTemplateRequest existsTemplateRequestES = requestConverter
.indicesExistsTemplateRequest(existsTemplateRequest);
return Mono.from(execute(client1 -> client1.existsTemplate(existsTemplateRequestES))).map(BooleanResponse::value);
return Mono.from(execute(client -> client.existsTemplate(existsTemplateRequestES))).map(BooleanResponse::value);
}
@Override
@ -307,13 +307,18 @@ public class ReactiveIndicesTemplate extends ReactiveChildTemplate<ReactiveElast
co.elastic.clients.elasticsearch.indices.DeleteTemplateRequest deleteTemplateRequestES = requestConverter
.indicesDeleteTemplateRequest(deleteTemplateRequest);
return Mono.from(execute(client1 -> client1.deleteTemplate(deleteTemplateRequestES)))
return Mono.from(execute(client -> client.deleteTemplate(deleteTemplateRequestES)))
.map(DeleteTemplateResponse::acknowledged);
}
@Override
public Flux<IndexInformation> getInformation(IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
GetIndexRequest request = requestConverter.indicesGetIndexRequest(index);
return Mono.from(execute(client -> client.get(request))) //
.map(responseConverter::indicesGetIndexInformations) //
.flatMapMany(Flux::fromIterable);
}
@Override

View File

@ -16,14 +16,15 @@
package org.springframework.data.elasticsearch.client.elc;
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
import static org.springframework.util.ObjectUtils.*;
import static org.springframework.util.CollectionUtils.*;
import co.elastic.clients.elasticsearch._types.Conflicts;
import co.elastic.clients.elasticsearch._types.InlineScript;
import co.elastic.clients.elasticsearch._types.OpType;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.VersionType;
import co.elastic.clients.elasticsearch._types.WaitForActiveShardOptions;
import co.elastic.clients.elasticsearch._types.mapping.FieldType;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.RuntimeField;
@ -31,20 +32,17 @@ import co.elastic.clients.elasticsearch._types.mapping.RuntimeFieldType;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch._types.query_dsl.Like;
import co.elastic.clients.elasticsearch.cluster.HealthRequest;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
import co.elastic.clients.elasticsearch.core.DeleteRequest;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.MgetRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation;
import co.elastic.clients.elasticsearch.core.mget.MultiGetOperation;
import co.elastic.clients.elasticsearch.core.search.Highlight;
import co.elastic.clients.elasticsearch.core.search.Rescore;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.elasticsearch.indices.update_aliases.Action;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpDeserializer;
@ -58,6 +56,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -67,6 +66,7 @@ import java.util.stream.Collectors;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.elasticsearch.core.ScriptType;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.index.AliasAction;
@ -562,6 +562,79 @@ class RequestConverter {
return builder.build();
}
private UpdateOperation<?, ?> bulkUpdateOperation(UpdateQuery query, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy) {
UpdateOperation.Builder<Object, Object> uob = new UpdateOperation.Builder<>();
String indexName = query.getIndexName() != null ? query.getIndexName() : index.getIndexName();
uob.index(indexName).id(query.getId());
uob.action(a -> {
a //
.script(getScript(query.getScriptData())) //
.doc(query.getDocument()) //
.upsert(query.getUpsert()) //
.scriptedUpsert(query.getScriptedUpsert()) //
.docAsUpsert(query.getDocAsUpsert()) //
;
if (query.getFetchSource() != null) {
a.source(sc -> sc.fetch(query.getFetchSource()));
}
if (query.getFetchSourceIncludes() != null || query.getFetchSourceExcludes() != null) {
List<String> includes = query.getFetchSourceIncludes() != null ? query.getFetchSourceIncludes()
: Collections.emptyList();
List<String> excludes = query.getFetchSourceExcludes() != null ? query.getFetchSourceExcludes()
: Collections.emptyList();
a.source(sc -> sc.filter(sf -> sf.includes(includes).excludes(excludes)));
}
return a;
});
uob //
.routing(query.getRouting()) //
.ifSeqNo(query.getIfSeqNo() != null ? Long.valueOf(query.getIfSeqNo()) : null) //
.ifPrimaryTerm(query.getIfPrimaryTerm() != null ? Long.valueOf(query.getIfPrimaryTerm()) : null) //
.retryOnConflict(query.getRetryOnConflict()) //
;
// no refresh, timeout, waitForActiveShards on UpdateOperation or UpdateAction
return uob.build();
}
@Nullable
private co.elastic.clients.elasticsearch._types.Script getScript(@Nullable ScriptData scriptData) {
if (scriptData == null) {
return null;
}
Map<String, JsonData> params = new HashMap<>();
if (scriptData.getParams() != null) {
scriptData.getParams().forEach((key, value) -> {
params.put(key, JsonData.of(value, jsonpMapper));
});
}
return co.elastic.clients.elasticsearch._types.Script.of(sb -> {
if (scriptData.getType() == ScriptType.INLINE) {
sb.inline(is -> is //
.lang(scriptData.getLanguage()) //
.source(scriptData.getScript()) //
.params(params)); //
} else if (scriptData.getType() == ScriptType.STORED) {
sb.stored(ss -> ss //
.id(scriptData.getScript()) //
.params(params) //
);
}
return sb;
});
}
public BulkRequest documentBulkRequest(List<?> queries, BulkOptions bulkOptions, IndexCoordinates indexCoordinates,
@Nullable RefreshPolicy refreshPolicy) {
@ -599,7 +672,8 @@ class RequestConverter {
ob.index(bulkIndexOperation(indexQuery, indexCoordinates, refreshPolicy));
}
} else if (query instanceof UpdateQuery) {
// todo #1973
UpdateQuery updateQuery = (UpdateQuery) query;
ob.update(bulkUpdateOperation(updateQuery, indexCoordinates, refreshPolicy));
}
return ob.build();
}).collect(Collectors.toList());
@ -674,7 +748,7 @@ class RequestConverter {
}
if (source.getQuery() != null) {
s.query(getQuery(source.getQuery()));
s.query(getQuery(source.getQuery(), null));
}
if (source.getRemote() != null) {
@ -731,13 +805,8 @@ class RequestConverter {
builder.script(s -> s.inline(InlineScript.of(i -> i.lang(script.getLang()).source(script.getSource()))));
}
if (reindexRequest.getTimeout() != null) {
builder.timeout(tv -> tv.time(reindexRequest.getTimeout().toMillis() + "ms"));
}
if (reindexRequest.getScroll() != null) {
builder.scroll(tv -> tv.time(reindexRequest.getScroll().toMillis() + "ms"));
}
builder.timeout(time(reindexRequest.getTimeout())) //
.scroll(time(reindexRequest.getScroll()));
if (reindexRequest.getWaitForActiveShards() != null) {
builder.waitForActiveShards(wfas -> wfas //
@ -779,7 +848,7 @@ class RequestConverter {
return DeleteByQueryRequest.of(b -> {
b.index(Arrays.asList(index.getIndexNames())) //
.query(getQuery(query))//
.query(getQuery(query, clazz))//
.refresh(deleteByQueryRefresh(refreshPolicy));
if (query.isLimiting()) {
@ -787,10 +856,7 @@ class RequestConverter {
b.maxDocs(Long.valueOf(query.getMaxResults()));
}
if (query.hasScrollTime()) {
// noinspection ConstantConditions
b.scroll(Time.of(t -> t.time(query.getScrollTime().toMillis() + "ms")));
}
b.scroll(time(query.getScrollTime()));
if (query.getRoute() != null) {
b.routing(query.getRoute());
@ -800,6 +866,140 @@ class RequestConverter {
});
}
public UpdateRequest<Document, ?> documentUpdateRequest(UpdateQuery query, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy, @Nullable String routing) {
String indexName = query.getIndexName() != null ? query.getIndexName() : index.getIndexName();
return UpdateRequest.of(uqb -> {
uqb.index(indexName).id(query.getId());
if (query.getScript() != null) {
Map<String, JsonData> params = new HashMap<>();
if (query.getParams() != null) {
query.getParams().forEach((key, value) -> {
params.put(key, JsonData.of(value, jsonpMapper));
});
}
uqb.script(sb -> {
if (query.getScriptType() == ScriptType.INLINE) {
sb.inline(is -> is //
.lang(query.getLang()) //
.source(query.getScript()) //
.params(params)); //
} else if (query.getScriptType() == ScriptType.STORED) {
sb.stored(ss -> ss //
.id(query.getScript()) //
.params(params) //
);
}
return sb;
}
);
}
uqb //
.doc(query.getDocument()) //
.upsert(query.getUpsert()) //
.routing(query.getRouting() != null ? query.getRouting() : routing) //
.scriptedUpsert(query.getScriptedUpsert()) //
.docAsUpsert(query.getDocAsUpsert()) //
.ifSeqNo(query.getIfSeqNo() != null ? Long.valueOf(query.getIfSeqNo()) : null) //
.ifPrimaryTerm(query.getIfPrimaryTerm() != null ? Long.valueOf(query.getIfPrimaryTerm()) : null) //
.refresh(refresh(refreshPolicy)) //
.retryOnConflict(query.getRetryOnConflict()) //
;
if (query.getFetchSource() != null) {
uqb.source(sc -> sc.fetch(query.getFetchSource()));
}
if (query.getFetchSourceIncludes() != null || query.getFetchSourceExcludes() != null) {
List<String> includes = query.getFetchSourceIncludes() != null ? query.getFetchSourceIncludes()
: Collections.emptyList();
List<String> excludes = query.getFetchSourceExcludes() != null ? query.getFetchSourceExcludes()
: Collections.emptyList();
uqb.source(sc -> sc.filter(sf -> sf.includes(includes).excludes(excludes)));
}
if (query.getTimeout() != null) {
uqb.timeout(tv -> tv.time(query.getTimeout()));
}
String waitForActiveShards = query.getWaitForActiveShards();
if (waitForActiveShards != null) {
if ("all".equalsIgnoreCase(waitForActiveShards)) {
uqb.waitForActiveShards(wfa -> wfa.option(WaitForActiveShardOptions.All));
} else {
int val;
try {
val = Integer.parseInt(waitForActiveShards);
} catch (NumberFormatException var3) {
throw new IllegalArgumentException("cannot parse ActiveShardCount[" + waitForActiveShards + "]", var3);
}
uqb.waitForActiveShards(wfa -> wfa.count(val));
}
}
return uqb;
} //
);
}
public UpdateByQueryRequest documentUpdateByQueryRequest(UpdateQuery updateQuery, IndexCoordinates index,
@Nullable RefreshPolicy refreshPolicy) {
return UpdateByQueryRequest.of(ub -> {
ub //
.index(Arrays.asList(index.getIndexNames())) //
.refresh(refreshPolicy == RefreshPolicy.IMMEDIATE) //
.routing(updateQuery.getRouting()) //
.script(getScript(updateQuery.getScriptData())) //
.maxDocs(updateQuery.getMaxDocs() != null ? Long.valueOf(updateQuery.getMaxDocs()) : null) //
.pipeline(updateQuery.getPipeline()) //
.requestsPerSecond(
updateQuery.getRequestsPerSecond() != null ? updateQuery.getRequestsPerSecond().longValue() : null) //
.slices(updateQuery.getSlices() != null ? Long.valueOf(updateQuery.getSlices()) : null) //
;
if (updateQuery.getAbortOnVersionConflict() != null) {
ub.conflicts(updateQuery.getAbortOnVersionConflict() ? Conflicts.Abort : Conflicts.Proceed);
}
if (updateQuery.getBatchSize() != null) {
ub.size(Long.valueOf(updateQuery.getBatchSize()));
}
if (updateQuery.getQuery() != null) {
Query queryQuery = updateQuery.getQuery();
ub.query(getQuery(queryQuery, null));
// no indicesOptions available like in old client
ub.scroll(time(queryQuery.getScrollTime()));
}
// no maxRetries available like in old client
// no shouldStoreResult
if (updateQuery.getRefreshPolicy() != null) {
ub.refresh(updateQuery.getRefreshPolicy() == RefreshPolicy.IMMEDIATE);
}
if (updateQuery.getTimeout() != null) {
ub.timeout(tb -> tb.time(updateQuery.getTimeout()));
}
if (updateQuery.getWaitForActiveShards() != null) {
ub.waitForActiveShards(w -> w.count(waitForActiveShardsCount(updateQuery.getWaitForActiveShards())));
}
return ub;
});
}
// endregion
// region search
@ -831,13 +1031,36 @@ class RequestConverter {
builder.scroll(t -> t.time(scrollTimeInMillis + "ms"));
}
builder.query(getQuery(query));
builder.query(getQuery(query, clazz));
addFilter(query, builder);
return builder.build();
}
public MsearchRequest searchMsearchRequest(
List<ElasticsearchTemplate.MultiSearchQueryParameter> multiSearchQueryParameters) {
return MsearchRequest.of(mrb -> {
multiSearchQueryParameters.forEach(param -> {
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntity(param.clazz);
mrb.searches(sb -> sb //
.header(h -> h //
.index(param.index.getIndexName()) //
// todo #1973 add remaining flags for header
) //
.body(bb -> bb //
.query(getQuery(param.query, param.clazz))//
// #1973 seq_no_primary_term and version not available in client ES issue 161
// todo #1973 add remaining flags for body
) //
);
});
return mrb;
});
}
private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
SearchRequest.Builder builder, boolean forCount, boolean useScroll) {
@ -930,7 +1153,11 @@ class RequestConverter {
if (!isEmpty(query.getSearchAfter())) {
builder.searchAfter(query.getSearchAfter().stream().map(Object::toString).collect(Collectors.toList()));
}
// todo #1973 rescorer queries
query.getRescorerQueries().forEach(rescorerQuery -> {
builder.rescore(getRescore(rescorerQuery));
});
// todo #1973 request cache
if (!query.getRuntimeFields().isEmpty()) {
@ -952,12 +1179,27 @@ class RequestConverter {
// request_cache is not allowed on scroll requests.
builder.requestCache(null);
Duration scrollTimeout = query.getScrollTime() != null ? query.getScrollTime() : Duration.ofMinutes(1);
builder.scroll(tv -> tv.time(scrollTimeout.toMillis() + "ms"));
builder.scroll(time(scrollTimeout));
// limit the number of documents in a batch
builder.size(500);
}
}
private Rescore getRescore(RescorerQuery rescorerQuery) {
return Rescore.of(r -> r //
.query(rq -> rq //
.query(getQuery(rescorerQuery.getQuery(), null)) //
.scoreMode(scoreMode(rescorerQuery.getScoreMode())) //
.queryWeight(rescorerQuery.getQueryWeight() != null ? Double.valueOf(rescorerQuery.getQueryWeight()) : 1.0) //
.rescoreQueryWeight(
rescorerQuery.getRescoreQueryWeight() != null ? Double.valueOf(rescorerQuery.getRescoreQueryWeight())
: 1.0) //
) //
.windowSize(rescorerQuery.getWindowSize()));
}
private void addHighlight(Query query, SearchRequest.Builder builder) {
Highlight highlight = query.getHighlightQuery()
@ -1032,23 +1274,35 @@ class RequestConverter {
}
private void prepareNativeSearch(NativeQuery query, SearchRequest.Builder builder) {
// todo #1973 script fields
// todo #1973 collapse builder
query.getScriptedFields().forEach(scriptedField -> {
builder.scriptFields(scriptedField.getFieldName(), sf -> sf.script(getScript(scriptedField.getScriptData())));
});
builder //
.suggest(query.getSuggester()) //
.collapse(query.getFieldCollapse()) //
;
// todo #1973 indices boost
if (!isEmpty(query.getAggregations())) {
builder.aggregations(query.getAggregations());
}
builder.suggest(query.getSuggester());
// todo #1973 searchExt
}
@Nullable
private co.elastic.clients.elasticsearch._types.query_dsl.Query getQuery(Query query) {
private co.elastic.clients.elasticsearch._types.query_dsl.Query getQuery(@Nullable Query query,
@Nullable Class<?> clazz) {
if (query == null) {
return null;
}
elasticsearchConverter.updateQuery(query, clazz);
// todo #1973 some native stuff
co.elastic.clients.elasticsearch._types.query_dsl.Query esQuery = null;
if (query instanceof CriteriaQuery) {
@ -1074,7 +1328,7 @@ class RequestConverter {
} else if (query instanceof StringQuery) {
// no filter for StringQuery
} else if (query instanceof NativeQuery) {
// todo #1973 NativeQuery filter
builder.postFilter(((NativeQuery) query).getFilter());
} else {
throw new IllegalArgumentException("unhandled Query implementation " + query.getClass().getName());
}
@ -1128,6 +1382,7 @@ class RequestConverter {
return moreLikeThisQuery;
}
// endregion
// region helper functions

View File

@ -23,6 +23,7 @@ import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.mget.MultiGetError;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.indices.*;
@ -314,6 +315,56 @@ class ResponseConverter {
}
public ByQueryResponse byQueryResponse(DeleteByQueryResponse response) {
// the code for the methods taking a DeleteByQueryResponse or a UpdateByQueryResponse is duplicated because the
// Elasticsearch responses do not share a common class
// noinspection DuplicatedCode
List<ByQueryResponse.Failure> failures = response.failures().stream().map(this::byQueryResponseFailureOf)
.collect(Collectors.toList());
ByQueryResponse.ByQueryResponseBuilder builder = ByQueryResponse.builder();
if (response.took() != null) {
builder.withTook(response.took());
}
if (response.timedOut() != null) {
builder.withTimedOut(response.timedOut());
}
if (response.total() != null) {
builder.withTotal(response.total());
}
if (response.deleted() != null) {
builder.withDeleted(response.deleted());
}
if (response.batches() != null) {
builder.withBatches(Math.toIntExact(response.batches()));
}
if (response.versionConflicts() != null) {
builder.withVersionConflicts(response.versionConflicts());
}
if (response.noops() != null) {
builder.withNoops(response.noops());
}
if (response.retries() != null) {
builder.withBulkRetries(response.retries().bulk());
builder.withSearchRetries(response.retries().search());
}
builder.withFailures(failures);
return builder.build();
}
public ByQueryResponse byQueryResponse(UpdateByQueryResponse response) {
// the code for the methods taking a DeleteByQueryResponse or a UpdateByQueryResponse is duplicated because the
// Elasticsearch responses do not share a common class
// noinspection DuplicatedCode
List<ByQueryResponse.Failure> failures = response.failures().stream().map(this::byQueryResponseFailureOf)
.collect(Collectors.toList());
@ -358,8 +409,8 @@ class ResponseConverter {
}
// endregion
// region helper functions
private long timeToLong(Time time) {
if (time.isTime()) {

View File

@ -20,7 +20,9 @@ import co.elastic.clients.elasticsearch._types.DistanceUnit;
import co.elastic.clients.elasticsearch._types.GeoDistanceType;
import co.elastic.clients.elasticsearch._types.OpType;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.SortMode;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.VersionType;
import co.elastic.clients.elasticsearch._types.mapping.FieldType;
import co.elastic.clients.elasticsearch.core.search.BoundaryScanner;
@ -30,11 +32,16 @@ import co.elastic.clients.elasticsearch.core.search.HighlighterFragmenter;
import co.elastic.clients.elasticsearch.core.search.HighlighterOrder;
import co.elastic.clients.elasticsearch.core.search.HighlighterTagsSchema;
import co.elastic.clients.elasticsearch.core.search.HighlighterType;
import co.elastic.clients.elasticsearch.core.search.ScoreMode;
import java.time.Duration;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.elasticsearch.core.query.GeoDistanceOrder;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Order;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.lang.Nullable;
@ -243,6 +250,54 @@ final class TypeUtils {
}
}
@Nullable
static UpdateResponse.Result result(@Nullable Result result) {
if (result == null) {
return null;
}
switch (result) {
case Created:
return UpdateResponse.Result.CREATED;
case Updated:
return UpdateResponse.Result.UPDATED;
case Deleted:
return UpdateResponse.Result.DELETED;
case NotFound:
return UpdateResponse.Result.NOT_FOUND;
case NoOp:
return UpdateResponse.Result.NOOP;
}
return null;
}
@Nullable
static ScoreMode scoreMode(@Nullable RescorerQuery.ScoreMode scoreMode) {
if (scoreMode == null) {
return null;
}
switch (scoreMode) {
case Default:
return null;
case Avg:
return ScoreMode.Avg;
case Max:
return ScoreMode.Max;
case Min:
return ScoreMode.Min;
case Total:
return ScoreMode.Total;
case Multiply:
return ScoreMode.Multiply;
}
return null;
}
@Nullable
static SortMode sortMode(Order.Mode mode) {
@ -260,6 +315,16 @@ final class TypeUtils {
return null;
}
@Nullable
static Time time(@Nullable Duration duration) {
if (duration == null) {
return null;
}
return Time.of(t -> t.time(duration.toMillis() + "ms"));
}
@Nullable
static VersionType versionType(
@Nullable org.springframework.data.elasticsearch.annotations.Document.VersionType versionType) {

View File

@ -73,7 +73,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
protected final SimpleElasticsearchMappingContext mappingContext;
protected final EntityOperations entityOperations;
protected @Nullable RefreshPolicy refreshPolicy = RefreshPolicy.IMMEDIATE;
protected @Nullable RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
protected RoutingResolver routingResolver;
protected @Nullable ReactiveEntityCallbacks entityCallbacks;
@ -443,7 +443,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
IndexCoordinates index);
@Override
public Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType) {
public Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType) {
return aggregate(query, entityType, getIndexCoordinatesFor(entityType));
}

View File

@ -22,8 +22,8 @@ import java.util.List;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
@ -237,7 +237,7 @@ public interface ReactiveSearchOperations {
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType);
Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType);
/**
* Perform an aggregation specified by the given {@link Query query}. <br />
@ -248,7 +248,7 @@ public interface ReactiveSearchOperations {
* @return a {@link Flux} emitting matching aggregations one by one.
* @since 4.0
*/
Flux<AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
Flux<? extends AggregationContainer<?>> aggregate(Query query, Class<?> entityType, IndexCoordinates index);
/**
* Does a suggest query

View File

@ -62,15 +62,15 @@ public class BaseQuery implements Query {
@Nullable protected Integer maxResults;
@Nullable protected HighlightQuery highlightQuery;
@Nullable private Boolean trackTotalHits;
@Nullable private Integer trackTotalHitsUpTo;
@Nullable private Duration scrollTime;
@Nullable private Duration timeout;
@Nullable protected Integer trackTotalHitsUpTo;
@Nullable protected Duration scrollTime;
@Nullable protected Duration timeout;
private boolean explain = false;
@Nullable private List<Object> searchAfter;
@Nullable protected List<Object> searchAfter;
protected List<RescorerQuery> rescorerQueries = new ArrayList<>();
@Nullable protected Boolean requestCache;
private List<IdWithRouting> idsWithRouting = Collections.emptyList();
private final List<RuntimeField> runtimeFields = new ArrayList<>();
protected List<IdWithRouting> idsWithRouting = Collections.emptyList();
protected final List<RuntimeField> runtimeFields = new ArrayList<>();
public BaseQuery() {}
@ -86,6 +86,8 @@ public class BaseQuery implements Query {
this.preference = builder.getPreference();
this.sourceFilter = builder.getSourceFilter();
this.fields = builder.getFields();
this.highlightQuery = builder.highlightQuery;
this.route = builder.getRoute();
// #1973 add the other fields to the builder
}

View File

@ -44,6 +44,8 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
@Nullable private String preference;
@Nullable private SourceFilter sourceFilter;
private List<String> fields = new ArrayList<>();
@Nullable protected HighlightQuery highlightQuery;
@Nullable private String route;
@Nullable
public Pageable getPageable() {
@ -92,6 +94,16 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
return fields;
}
@Nullable
public HighlightQuery getHighlightQuery() {
return highlightQuery;
}
@Nullable
public String getRoute() {
return route;
}
public SELF withPageable(Pageable pageable) {
this.pageable = pageable;
return self();
@ -156,6 +168,16 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
return self();
}
public SELF withHighlightQuery(HighlightQuery highlightQuery) {
this.highlightQuery = highlightQuery;
return self();
}
public SELF withRoute(String route) {
this.route = route;
return self();
}
public abstract Q build();
private SELF self() {

View File

@ -65,7 +65,6 @@ public class NativeSearchQueryBuilder extends BaseQueryBuilder<NativeSearchQuery
@Nullable private CollapseBuilder collapseBuilder;
@Nullable private List<IndexBoost> indicesBoost = new ArrayList<>();
@Nullable private SearchTemplateRequestBuilder searchTemplateBuilder;
@Nullable private String route;
@Nullable private SearchType searchType;
@Nullable private Boolean trackTotalHits;
@Nullable private Duration timeout;
@ -232,11 +231,6 @@ public class NativeSearchQueryBuilder extends BaseQueryBuilder<NativeSearchQuery
return this;
}
public NativeSearchQueryBuilder withRoute(String route) {
this.route = route;
return this;
}
public NativeSearchQueryBuilder withSearchType(SearchType searchType) {
this.searchType = searchType;
return this;
@ -320,10 +314,6 @@ public class NativeSearchQueryBuilder extends BaseQueryBuilder<NativeSearchQuery
nativeSearchQuery.setPipelineAggregations(pipelineAggregationBuilders);
}
if (route != null) {
nativeSearchQuery.setRoute(route);
}
if (searchType != null) {
nativeSearchQuery.setSearchType(Query.SearchType.valueOf(searchType.name()));
}

View File

@ -0,0 +1,70 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import java.util.Map;
import org.springframework.data.elasticsearch.core.ScriptType;
import org.springframework.lang.Nullable;
/**
* value class combining script information.
*
* @author Peter-Josef Meisch
* @since 4.4
*/
public final class ScriptData {
@Nullable private final ScriptType type;
@Nullable private final String language;
@Nullable private final String script;
@Nullable private final String scriptName;
@Nullable private final Map<String, Object> params;
public ScriptData(@Nullable ScriptType type, @Nullable String language, @Nullable String script,
@Nullable String scriptName, @Nullable Map<String, Object> params) {
this.type = type;
this.language = language;
this.script = script;
this.scriptName = scriptName;
this.params = params;
}
@Nullable
public ScriptType getType() {
return type;
}
@Nullable
public String getLanguage() {
return language;
}
@Nullable
public String getScript() {
return script;
}
@Nullable
public String getScriptName() {
return scriptName;
}
@Nullable
public Map<String, Object> getParams() {
return params;
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import org.springframework.util.Assert;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ScriptedField {
private final String fieldName;
private final ScriptData scriptData;
public ScriptedField(String fieldName, ScriptData scriptData) {
Assert.notNull(fieldName, "fieldName must not be null");
Assert.notNull(scriptData, "scriptData must not be null");
this.fieldName = fieldName;
this.scriptData = scriptData;
}
public String getFieldName() {
return fieldName;
}
public ScriptData getScriptData() {
return scriptData;
}
}

View File

@ -35,11 +35,8 @@ import org.springframework.lang.Nullable;
public class UpdateQuery {
private final String id;
@Nullable private final String script;
@Nullable private final Map<String, Object> params;
@Nullable private final Document document;
@Nullable private final Document upsert;
@Nullable private final String lang;
@Nullable private final String routing;
@Nullable private final Boolean scriptedUpsert;
@Nullable private final Boolean docAsUpsert;
@ -61,9 +58,8 @@ public class UpdateQuery {
@Nullable private final Float requestsPerSecond;
@Nullable private final Boolean shouldStoreResult;
@Nullable private final Integer slices;
@Nullable private final ScriptType scriptType;
@Nullable private final String scriptName;
@Nullable private final String indexName;
@Nullable private final ScriptData scriptData;
public static Builder builder(String id) {
return new Builder(id);
@ -85,11 +81,8 @@ public class UpdateQuery {
@Nullable String scriptName, @Nullable String indexName) {
this.id = id;
this.script = script;
this.params = params;
this.document = document;
this.upsert = upsert;
this.lang = lang;
this.routing = routing;
this.scriptedUpsert = scriptedUpsert;
this.docAsUpsert = docAsUpsert;
@ -111,9 +104,13 @@ public class UpdateQuery {
this.requestsPerSecond = requestsPerSecond;
this.shouldStoreResult = shouldStoreResult;
this.slices = slices;
this.scriptType = scriptType;
this.scriptName = scriptName;
this.indexName = indexName;
if (scriptType != null || lang != null || script != null || scriptName != null || params != null) {
this.scriptData = new ScriptData(scriptType, lang, script, scriptName, params);
} else {
this.scriptData = null;
}
}
public String getId() {
@ -122,12 +119,12 @@ public class UpdateQuery {
@Nullable
public String getScript() {
return script;
return scriptData != null ? scriptData.getScript() : null;
}
@Nullable
public Map<String, Object> getParams() {
return params;
return scriptData != null ? scriptData.getParams() : null;
}
@Nullable
@ -142,7 +139,7 @@ public class UpdateQuery {
@Nullable
public String getLang() {
return lang;
return scriptData != null ? scriptData.getLanguage() : null;
}
@Nullable
@ -252,12 +249,12 @@ public class UpdateQuery {
@Nullable
public ScriptType getScriptType() {
return scriptType;
return scriptData != null ? scriptData.getType() : null;
}
@Nullable
public String getScriptName() {
return scriptName;
return scriptData != null ? scriptData.getScriptName() : null;
}
/**
@ -268,13 +265,21 @@ public class UpdateQuery {
return indexName;
}
/**
* @since 4.4
*/
@Nullable
public ScriptData getScriptData() {
return scriptData;
}
public static final class Builder {
private String id;
@Nullable private String script = null;
@Nullable private Map<String, Object> params;
@Nullable private Document document = null;
@Nullable private Document upsert = null;
@Nullable private String lang = "painless";
@Nullable private String lang = null;
@Nullable private String routing = null;
@Nullable private Boolean scriptedUpsert;
@Nullable private Boolean docAsUpsert;

View File

@ -35,6 +35,13 @@ public class UpdateResponse {
this.result = result;
}
/**
* @since 4.4
*/
public static UpdateResponse of(Result result) {
return new UpdateResponse(result);
}
public Result getResult() {
return result;
}

View File

@ -30,6 +30,17 @@ public class Highlight {
private final HighlightParameters parameters;
private final List<HighlightField> fields;
/**
* @since 4.4
*/
public Highlight(List<HighlightField> fields) {
Assert.notNull(fields, "fields must not be null");
this.parameters = HighlightParameters.builder().build();
this.fields = fields;
}
public Highlight(HighlightParameters parameters, List<HighlightField> fields) {
Assert.notNull(parameters, "parameters must not be null");

View File

@ -25,6 +25,17 @@ public class HighlightField {
private final String name;
private final HighlightFieldParameters parameters;
/**
* @since 4.4
*/
public HighlightField(String name) {
Assert.notNull(name, "name must not be null");
this.name = name;
this.parameters = HighlightFieldParameters.builder().build();
}
public HighlightField(String name, HighlightFieldParameters parameters) {
Assert.notNull(name, "name must not be null");

View File

@ -1,4 +1,4 @@
Spring Data Elasticsearch 4.4 M3 (2021.2.0)
Spring Data Elasticsearch 4.4 RC1 (2021.2.0)
Copyright (c) [2013-2021] Pivotal Software, Inc.
This product is licensed to you under the Apache License, Version 2.0 (the "License").
@ -32,6 +32,8 @@ conditions of the subcomponent's license, as noted in the LICENSE file.

View File

@ -0,0 +1,57 @@
/*
// * Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch;
import static org.springframework.data.elasticsearch.client.elc.QueryBuilders.*;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.QueryBuilders;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
/**
* Class providing some queries for the new Elasticsearch client needed in different tests.
*
* @author Peter-Josef Meisch
* @since 4.4
*/
public final class ELCQueries {
private ELCQueries() {}
public static Query getTermsAggsQuery(String aggsName, String aggsField){
return NativeQuery.builder() //
.withQuery(QueryBuilders.matchAllQueryAsQuery()) //
.withAggregation(aggsName, Aggregation.of(a -> a //
.terms(ta -> ta.field(aggsField)))) //
.withMaxResults(0) //
.build();
}
public static Query queryWithIds(String... ids) {
return NativeQuery.builder().withIds(ids).build();
}
public static BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return NativeQuery.builder().withQuery(matchAllQueryAsQuery());
}
public static BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value) {
return NativeQuery.builder().withQuery(termQueryAsQuery(field, value));
}
}

View File

@ -18,16 +18,29 @@ package org.springframework.data.elasticsearch.core;
import static org.springframework.data.elasticsearch.client.elc.QueryBuilders.*;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.FunctionBoostMode;
import co.elastic.clients.elasticsearch._types.query_dsl.FunctionScoreMode;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import co.elastic.clients.json.JsonData;
import java.util.Map;
import org.junit.jupiter.api.DisplayName;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.ELCQueries;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.NativeQueryBuilder;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilterBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptData;
import org.springframework.data.elasticsearch.core.query.ScriptedField;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -54,7 +67,7 @@ public class ElasticsearchELCIntegrationTests extends ElasticsearchIntegrationTe
@Override
protected Query queryWithIds(String... ids) {
return NativeQuery.builder().withIds(ids).build();
return ELCQueries.queryWithIds(ids);
}
@Override
@ -64,7 +77,7 @@ public class ElasticsearchELCIntegrationTests extends ElasticsearchIntegrationTe
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return NativeQuery.builder().withQuery(matchAllQueryAsQuery());
return ELCQueries.getBuilderWithMatchAllQuery();
}
@Override
@ -93,4 +106,101 @@ public class ElasticsearchELCIntegrationTests extends ElasticsearchIntegrationTe
.withMinScore(minScore) //
.build();
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
return NativeQuery.builder() //
.withQuery(matchAllQueryAsQuery()) //
.withFieldCollapse(FieldCollapse.of(fc -> {
fc.field(collapseField);
if (innerHits != null) {
fc.innerHits(ih -> ih.name(innerHits).size(size));
}
return fc;
})).build();
}
@Override
protected Query getMatchAllQueryWithFilterForId(String id) {
return NativeQuery.builder() //
.withQuery(matchAllQueryAsQuery()) //
.withFilter(termQueryAsQuery("id", id)) //
.build();
}
@Override
protected Query getQueryForParentId(String type, String id, @Nullable String route) {
NativeQueryBuilder queryBuilder = NativeQuery.builder() //
.withQuery(qb -> qb //
.parentId(p -> p.type(type).id(id)) //
);
if (route != null) {
queryBuilder.withRoute(route);
}
return queryBuilder.build();
}
@Override
protected Query getMatchAllQueryWithIncludesAndInlineExpressionScript(@Nullable String includes, String fieldName,
String script, Map<String, Object> params) {
NativeQueryBuilder nativeQueryBuilder = NativeQuery.builder().withQuery(matchAllQueryAsQuery());
if (includes != null) {
nativeQueryBuilder.withSourceFilter(new FetchSourceFilterBuilder().withIncludes(includes).build());
}
return nativeQueryBuilder.withScriptedField(new ScriptedField( //
fieldName, //
new ScriptData(ScriptType.INLINE, "expression", script, null, params))) //
.build();
}
@Override
protected Query getQueryWithRescorer() {
return NativeQuery.builder() //
.withQuery(q -> q //
.bool(b -> b //
.filter(f -> f.exists(e -> e.field("rate"))) //
.should(s -> s.term(t -> t.field("message").value("message"))) //
)) //
.withResorerQuery( //
new RescorerQuery(NativeQuery.builder() //
.withQuery(q -> q //
.functionScore(fs -> fs //
.functions(f1 -> f1 //
.filter(matchAllQueryAsQuery()) //
.weight(1.0) //
.gauss(d -> d //
.field("rate") //
.placement(dp -> dp //
.origin(JsonData.of(0)) //
.scale(JsonData.of(10)) //
.decay(0.5)) //
)) //
.functions(f2 -> f2 //
.filter(matchAllQueryAsQuery()).weight(100.0) //
.gauss(d -> d //
.field("rate") //
.placement(dp -> dp //
.origin(JsonData.of(0)) //
.scale(JsonData.of(10)) //
.decay(0.5)) //
)) //
.scoreMode(FunctionScoreMode.Sum) //
.maxBoost(80.0) //
.boostMode(FunctionBoostMode.Replace)) //
) //
.build() //
) //
.withScoreMode(RescorerQuery.ScoreMode.Max) //
.withWindowSize(100)) //
.build();
}
}

View File

@ -27,8 +27,17 @@ import java.util.Map;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.join.query.ParentIdQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.json.JSONException;
import org.junit.jupiter.api.DisplayName;
@ -38,12 +47,16 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilterBuilder;
import org.springframework.data.elasticsearch.core.query.IndicesOptions;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.RescorerQuery;
import org.springframework.data.elasticsearch.core.query.ScriptField;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -115,6 +128,79 @@ public class ElasticsearchERHLCIntegrationTests extends ElasticsearchIntegration
.withMinScore(minScore).build();
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseField);
if (innerHits != null) {
InnerHitBuilder innerHitBuilder = new InnerHitBuilder(innerHits);
if (size != null) {
innerHitBuilder.setSize(size);
}
collapseBuilder.setInnerHits(innerHitBuilder);
}
return new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withCollapseBuilder(collapseBuilder).build();
}
@Override
protected Query getMatchAllQueryWithFilterForId(String id) {
return new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withFilter(boolQuery().filter(termQuery("id", id)))
.build();
}
@Override
protected Query getQueryForParentId(String type, String id, @Nullable String route) {
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder()
.withQuery(new ParentIdQueryBuilder(type, id));
if (route != null) {
queryBuilder.withRoute(route);
}
return queryBuilder.build();
}
@Override
protected Query getMatchAllQueryWithIncludesAndInlineExpressionScript(@Nullable String includes, String fieldName,
String script, Map<String, java.lang.Object> params) {
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder().withQuery(matchAllQuery());
if (includes != null) {
nativeSearchQueryBuilder.withSourceFilter(new FetchSourceFilterBuilder().withIncludes(includes).build());
}
return nativeSearchQueryBuilder.withScriptField(new ScriptField(fieldName,
new Script(org.elasticsearch.script.ScriptType.INLINE, "expression", script, params))).build();
}
@Override
protected Query getQueryWithRescorer() {
return new NativeSearchQueryBuilder() //
.withQuery( //
boolQuery() //
.filter(existsQuery("rate")) //
.should(termQuery("message", "message"))) //
.withRescorerQuery( //
new RescorerQuery( //
new NativeSearchQueryBuilder() //
.withQuery(QueryBuilders
.functionScoreQuery(new FunctionScoreQueryBuilder.FilterFunctionBuilder[] {
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(1f)),
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(100f)) }) //
.scoreMode(FunctionScoreQuery.ScoreMode.SUM) //
.maxBoost(80f) //
.boostMode(CombineFunction.REPLACE)) //
.build())//
.withScoreMode(RescorerQuery.ScoreMode.Max) //
.withWindowSize(100)) //
.build();
}
@Test // DATAES-768
void shouldUseAllOptionsFromUpdateQuery() {
Map<String, Object> doc = new HashMap<>();

View File

@ -36,19 +36,6 @@ import java.util.stream.IntStream;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.util.Lists;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.lucene.search.function.CombineFunction;
import org.elasticsearch.common.lucene.search.function.FunctionScoreQuery;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder.FilterFunctionBuilder;
import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder;
import org.elasticsearch.join.query.ParentIdQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Order;
@ -82,7 +69,8 @@ import org.springframework.data.elasticsearch.core.index.Settings;
import org.springframework.data.elasticsearch.core.join.JoinField;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode;
import org.springframework.data.elasticsearch.core.query.highlight.Highlight;
import org.springframework.data.elasticsearch.core.query.highlight.HighlightField;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.data.util.StreamUtils;
@ -170,7 +158,23 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
protected abstract BaseQueryBuilder<?, ?> getBuilderWithWildcardQuery(String field, String value);
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
protected abstract Query getQueryWithCollapse(String collapseField, @Nullable String innerHits,
@Nullable Integer size);
protected abstract Query getMatchAllQueryWithFilterForId(String id);
protected abstract Query getQueryForParentId(String type, String id, @Nullable String route);
protected Query getMatchAllQueryWithInlineExpressionScript(String fieldName, String script,
Map<String, java.lang.Object> params) {
return getMatchAllQueryWithIncludesAndInlineExpressionScript(null, fieldName, script, params);
}
protected abstract Query getMatchAllQueryWithIncludesAndInlineExpressionScript(@Nullable String includes,
String fieldName, String script, Map<String, java.lang.Object> params);
protected abstract Query getQueryWithRescorer();
@Test
public void shouldThrowDataAccessExceptionIfDocumentDoesNotExistWhileDoingPartialUpdate() {
@ -451,7 +455,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getTotalHits()).isEqualTo(2);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldDoBulkUpdate() {
@ -618,11 +621,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(operations.count(searchQuery, IndexCoordinates.of("test-index-*"))).isEqualTo(2);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldFilterSearchResultsForGivenFilter() {
// given
String documentId = nextIdAsString();
SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("some message")
.version(System.currentTimeMillis()).build();
@ -630,14 +631,11 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
IndexQuery indexQuery = getIndexQuery(sampleEntity);
operations.index(indexQuery, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withFilter(boolQuery().filter(termQuery("id", documentId))).build();
Query query = getMatchAllQueryWithFilterForId(documentId);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
assertThat(searchHits.getTotalHits()).isEqualTo(1);
}
@ -825,7 +823,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getTotalHits()).isEqualTo(1);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldUseScriptedFields() {
@ -847,10 +844,8 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
params.put("factor", 2);
// when
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withScriptField(
new ScriptField("scriptedRate", new Script(ScriptType.INLINE, "expression", "doc['rate'] * factor", params)))
.build();
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
Query query = getMatchAllQueryWithInlineExpressionScript("scriptedRate", "doc['rate'] * factor", params);
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -1502,7 +1497,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(indexOperations.exists()).isFalse();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldDoPartialUpdateForExistingDocument() {
@ -1534,10 +1528,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
void shouldDoUpdateByQueryForExistingDocument() {
// given
final String documentId = nextIdAsString();
final String messageBeforeUpdate = "some test message";
final String messageAfterUpdate = "test message";
@ -1549,7 +1542,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.index(indexQuery, IndexCoordinates.of(indexNameProvider.indexName()));
final NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
final Query query = operations.matchAllQuery();
final UpdateQuery updateQuery = UpdateQuery.builder(query)
.withScriptType(org.springframework.data.elasticsearch.core.ScriptType.INLINE)
@ -1557,42 +1550,15 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
.withParams(Collections.singletonMap("newMessage", messageAfterUpdate)).withAbortOnVersionConflict(true)
.build();
// when
operations.updateByQuery(updateQuery, IndexCoordinates.of(indexNameProvider.indexName()));
// then
SampleEntity indexedEntity = operations.get(documentId, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
assertThat(indexedEntity.getMessage()).isEqualTo(messageAfterUpdate);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-227
public void shouldUseUpsertOnUpdate() {
// given
Map<String, Object> doc = new HashMap<>();
doc.put("id", "1");
doc.put("message", "test");
org.springframework.data.elasticsearch.core.document.Document document = org.springframework.data.elasticsearch.core.document.Document
.from(doc);
UpdateQuery updateQuery = UpdateQuery.builder("1") //
.withDocument(document) //
.withUpsert(document) //
.build();
// when
UpdateRequest request = getRequestFactory().updateRequest(updateQuery, IndexCoordinates.of("index"));
// then
assertThat(request).isNotNull();
assertThat(request.upsertRequest()).isNotNull();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
public void shouldDoUpsertIfDocumentDoesNotExist() {
@ -1650,63 +1616,57 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(entities.size()).isGreaterThanOrEqualTo(1);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@DisabledIf(value = "newElasticsearchClient",
disabledReason = "todo #1973 can't check response, open ES issue 161 that does not allow seqno")
// and version to be set in the request
@Test // DATAES-487
public void shouldReturnSameEntityForMultiSearch() {
// given
List<IndexQuery> indexQueries = new ArrayList<>();
indexQueries.add(buildIndex(SampleEntity.builder().id("1").message("ab").build()));
indexQueries.add(buildIndex(SampleEntity.builder().id("2").message("bc").build()));
indexQueries.add(buildIndex(SampleEntity.builder().id("3").message("ac").build()));
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
List<Query> queries = new ArrayList<>();
queries.add(getTermQuery("message", "ab"));
queries.add(getTermQuery("message", "bc"));
queries.add(getTermQuery("message", "ac"));
// when
List<NativeSearchQuery> queries = new ArrayList<>();
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "ab")).build());
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "bc")).build());
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "ac")).build());
// then
List<SearchHits<SampleEntity>> searchHits = operations.multiSearch(queries, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
for (SearchHits<SampleEntity> sampleEntity : searchHits) {
assertThat(sampleEntity.getTotalHits()).isEqualTo(1);
}
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@DisabledIf(value = "newElasticsearchClient",
disabledReason = "todo #1973 can't check response, open ES issue 161 that does not allow seqno")
// and version to be set in the request
@Test // DATAES-487
public void shouldReturnDifferentEntityForMultiSearch() {
// given
Class<Book> clazz = Book.class;
IndexOperations bookIndexOperations = operations.indexOps(Book.class);
bookIndexOperations.delete();
bookIndexOperations.create();
indexOperations.putMapping(clazz);
bookIndexOperations.createWithMapping();
bookIndexOperations.refresh();
IndexCoordinates bookIndex = IndexCoordinates.of("test-index-book-core-template");
IndexCoordinates bookIndex = IndexCoordinates.of("i-need-my-own-index");
operations.index(buildIndex(SampleEntity.builder().id("1").message("ab").build()),
IndexCoordinates.of(indexNameProvider.indexName()));
operations.index(buildIndex(Book.builder().id("2").description("bc").build()), bookIndex);
bookIndexOperations.refresh();
// when
List<NativeSearchQuery> queries = new ArrayList<>();
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("message", "ab")).build());
queries.add(new NativeSearchQueryBuilder().withQuery(termQuery("description", "bc")).build());
List<Query> queries = new ArrayList<>();
queries.add(getTermQuery("message", "ab"));
queries.add(getTermQuery("description", "bc"));
List<SearchHits<?>> searchHitsList = operations.multiSearch(queries, Lists.newArrayList(SampleEntity.class, clazz),
IndexCoordinates.of(indexNameProvider.indexName(), bookIndex.getIndexName()));
// then
bookIndexOperations.delete();
SearchHits<?> searchHits0 = searchHitsList.get(0);
assertThat(searchHits0.getTotalHits()).isEqualTo(1L);
SearchHit<SampleEntity> searchHit0 = (SearchHit<SampleEntity>) searchHits0.getSearchHit(0);
@ -1950,7 +1910,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
@Test
public void shouldIndexSampleEntityWithIndexAtRuntime() {
String indexName = "custom-" + indexNameProvider.indexName();
String indexName = indexNameProvider.indexName() + "-custom";
// given
String documentId = nextIdAsString();
@ -2699,7 +2659,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(sampleEntities.get(2).getContent().getMessage()).isEqualTo(sampleEntity1.getMessage());
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-593
public void shouldReturnDocumentWithCollapsedField() {
@ -2715,11 +2674,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withCollapseField("rate")
.build();
Query query = getQueryWithCollapse("rate", null, null);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -2730,7 +2687,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getSearchHit(1).getContent().getMessage()).isEqualTo("message 2");
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1493
@DisplayName("should return document with collapse field and inner hits")
public void shouldReturnDocumentWithCollapsedFieldAndInnerHits() {
@ -2747,11 +2703,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withCollapseBuilder(new CollapseBuilder("rate").setInnerHits(new InnerHitBuilder("innerHits"))).build();
Query query = getQueryWithCollapse("rate", "innerHits", null);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -2764,7 +2719,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHits.getSearchHit(1).getInnerHits("innerHits").getTotalHits()).isEqualTo(1);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1997
@DisplayName("should return document with inner hits size zero")
void shouldReturnDocumentWithInnerHitsSizeZero() {
@ -2777,12 +2732,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, IndexCoordinates.of(indexNameProvider.indexName()));
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withCollapseBuilder(new CollapseBuilder("rate").setInnerHits(new InnerHitBuilder("innerHits").setSize(0)))
.build();
Query query = getQueryWithCollapse("rate", "innerHits", 0);
// when
SearchHits<SampleEntity> searchHits = operations.search(searchQuery, SampleEntity.class,
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
// then
@ -2867,7 +2820,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
List<Object> sortValues = searchHit.getSortValues();
assertThat(sortValues).hasSize(2);
assertThat(sortValues.get(0)).isInstanceOf(String.class).isEqualTo("thousands");
// transport client returns Long, RestHghlevelClient Integer, new ElasticsearchClient String
// transport client returns Long, RestHighlevelClient Integer, new ElasticsearchClient String
java.lang.Object o = sortValues.get(1);
if (o instanceof Integer) {
Integer i = (Integer) o;
@ -2882,7 +2835,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-715
void shouldReturnHighlightFieldsInSearchHit() {
IndexCoordinates index = IndexCoordinates.of("test-index-highlight-entity-template");
@ -2893,11 +2845,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.index(indexQuery, index);
operations.indexOps(index).refresh();
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(termQuery("message", "message")) //
.withHighlightFields(new HighlightBuilder.Field("message")) //
Query query = getBuilderWithTermQuery("message", "message") //
.withHighlightQuery(
new HighlightQuery(new Highlight(singletonList(new HighlightField("message"))), HighlightEntity.class))
.build();
SearchHits<HighlightEntity> searchHits = operations.search(query, HighlightEntity.class, index);
assertThat(searchHits).isNotNull();
@ -2910,10 +2861,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(highlightField.get(1)).contains("<em>message</em>");
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1686
void shouldRunRescoreQueryInSearchQuery() {
IndexCoordinates index = IndexCoordinates.of("test-index-rescore-entity-template");
IndexCoordinates index = IndexCoordinates.of(indexNameProvider.getPrefix() + "rescore-entity");
// matches main query better
SampleEntity entity = SampleEntity.builder() //
@ -2933,17 +2883,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.bulkIndex(indexQueries, index);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(boolQuery().filter(existsQuery("rate")).should(termQuery("message", "message"))) //
.withRescorerQuery(
new RescorerQuery(new NativeSearchQueryBuilder().withQuery(QueryBuilders
.functionScoreQuery(new FunctionScoreQueryBuilder.FilterFunctionBuilder[] {
new FilterFunctionBuilder(new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(1f)),
new FilterFunctionBuilder(
new GaussDecayFunctionBuilder("rate", 0, 10, null, 0.5).setWeight(100f)) })
.scoreMode(FunctionScoreQuery.ScoreMode.SUM).maxBoost(80f).boostMode(CombineFunction.REPLACE)).build())
.withScoreMode(ScoreMode.Max).withWindowSize(100))
.build();
Query query = getQueryWithRescorer();
SearchHits<SampleEntity> searchHits = operations.search(query, SampleEntity.class, index);
@ -2956,6 +2896,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(searchHit.getScore()).isEqualTo(80f);
}
@Test
// DATAES-738
void shouldSaveEntityWithIndexCoordinates() {
@ -3128,7 +3069,9 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThatSeqNoPrimaryTermIsFilled(retrieved);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@DisabledIf(value = "newElasticsearchClient",
disabledReason = "todo #1973 can't check response, open ES issue 161 that does not allow seqno")
// and version to be set in the request
@Test // DATAES-799
void multiSearchShouldReturnSeqNoPrimaryTerm() {
OptimisticEntity original = new OptimisticEntity();
@ -3158,7 +3101,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThatSeqNoPrimaryTermIsFilled(retrieved);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnEntityWithSeqNoPrimaryTermProperty() {
OptimisticEntity original = new OptimisticEntity();
@ -3175,7 +3117,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThatThrownBy(() -> operations.save(forEdit2)).isInstanceOf(OptimisticLockingFailureException.class);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnVersionedEntityWithSeqNoPrimaryTermProperty() {
OptimisticAndVersionedEntity original = new OptimisticAndVersionedEntity();
@ -3204,7 +3145,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.save(forEdit);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
void shouldSupportCRUDOpsForEntityWithJoinFields() throws Exception {
String qId1 = java.util.UUID.randomUUID().toString();
@ -3258,9 +3198,8 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
operations.save(
Arrays.asList(sampleQuestionEntity1, sampleQuestionEntity2, sampleAnswerEntity1, sampleAnswerEntity2), index);
SearchHits<SampleJoinEntity> hits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId1)).build(),
SampleJoinEntity.class);
Query query = getQueryForParentId("answer", qId1, null);
SearchHits<SampleJoinEntity> hits = operations.search(query, SampleJoinEntity.class);
List<String> hitIds = hits.getSearchHits().stream()
.map(sampleJoinEntitySearchHit -> sampleJoinEntitySearchHit.getId()).collect(Collectors.toList());
@ -3287,8 +3226,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
// when
operations.bulkUpdate(queries, IndexCoordinates.of(indexNameProvider.indexName()));
SearchHits<SampleJoinEntity> updatedHits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId2)).build(),
SearchHits<SampleJoinEntity> updatedHits = operations.search(getQueryForParentId("answer", qId2, null),
SampleJoinEntity.class);
List<String> hitIds = updatedHits.getSearchHits().stream().map(new Function<SearchHit<SampleJoinEntity>, String>() {
@ -3300,8 +3238,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(hitIds.size()).isEqualTo(1);
assertThat(hitIds.get(0)).isEqualTo(aId2);
updatedHits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId1)).build(),
updatedHits = operations.search(getQueryForParentId("answer", qId1, null),
SampleJoinEntity.class);
hitIds = updatedHits.getSearchHits().stream().map(new Function<SearchHit<SampleJoinEntity>, String>() {
@ -3315,20 +3252,15 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
private void shouldDeleteEntityWithJoinFields(String qId2, String aId2) throws Exception {
Query query = new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId2)).withRoute(qId2)
.build();
operations.delete(query, SampleJoinEntity.class, IndexCoordinates.of(indexNameProvider.indexName()));
SearchHits<SampleJoinEntity> deletedHits = operations.search(
new NativeSearchQueryBuilder().withQuery(new ParentIdQueryBuilder("answer", qId2)).build(),
operations.delete(getQueryForParentId("answer", qId2, qId2), SampleJoinEntity.class,
IndexCoordinates.of(indexNameProvider.indexName()));
SearchHits<SampleJoinEntity> deletedHits = operations.search(getQueryForParentId("answer", qId2, null),
SampleJoinEntity.class);
List<String> hitIds = deletedHits.getSearchHits().stream().map(new Function<SearchHit<SampleJoinEntity>, String>() {
@Override
public String apply(SearchHit<SampleJoinEntity> sampleJoinEntitySearchHit) {
return sampleJoinEntitySearchHit.getId();
}
}).collect(Collectors.toList());
List<String> hitIds = deletedHits.getSearchHits().stream()
.map(sampleJoinEntitySearchHit -> sampleJoinEntitySearchHit.getId()).collect(Collectors.toList());
assertThat(hitIds.size()).isEqualTo(0);
}
@ -3539,7 +3471,6 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(retrieved).isEqualTo(saved);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1488
@DisplayName("should set scripted fields on immutable objects")
void shouldSetScriptedFieldsOnImmutableObjects() {
@ -3549,13 +3480,10 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
Map<String, Object> params = new HashMap<>();
params.put("factor", 2);
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.withSourceFilter(new FetchSourceFilterBuilder().withIncludes("*").build())
.withScriptField(new ScriptField("scriptedRate",
new Script(ScriptType.INLINE, "expression", "doc['rate'] * factor", params)))
.build();
Query query = getMatchAllQueryWithIncludesAndInlineExpressionScript("*", "scriptedRate", "doc['rate'] * factor",
params);
SearchHits<ImmutableWithScriptedEntity> searchHits = operations.search(searchQuery,
SearchHits<ImmutableWithScriptedEntity> searchHits = operations.search(query,
ImmutableWithScriptedEntity.class);
assertThat(searchHits.getTotalHits()).isEqualTo(1);
@ -3565,6 +3493,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
assertThat(foundEntity.getScriptedRate()).isEqualTo(84.0);
}
@Test // #1893
@DisplayName("should index document from source with version")
void shouldIndexDocumentFromSourceWithVersion() {
@ -3875,7 +3804,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@Document(indexName = "test-index-book-core-template")
@Document(indexName = "i-need-my-own-index")
static class Book {
@Nullable
@Id private String id;
@ -4419,7 +4348,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@Document(indexName = "immutable-class")
@Document(indexName = "#{@indexNameProvider.indexName()}-immutable")
private static final class ImmutableEntity {
@Id
@Nullable private final String id;
@ -4477,7 +4406,7 @@ public abstract class ElasticsearchIntegrationTests implements NewElasticsearchC
}
}
@Document(indexName = "immutable-scripted")
@Document(indexName = "#{@indexNameProvider.indexName()}-immutable-scripted")
public static final class ImmutableWithScriptedEntity {
@Id private final String id;
@Field(type = Integer)

View File

@ -15,11 +15,30 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.elasticsearch.client.elc.QueryBuilders.*;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch._types.aggregations.Buckets;
import co.elastic.clients.elasticsearch._types.aggregations.StringTermsAggregate;
import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.ELCQueries;
import org.springframework.data.elasticsearch.client.elc.Aggregation;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregation;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -37,9 +56,66 @@ public class ReactiveElasticsearchELCIntegrationTests extends ReactiveElasticsea
return new IndexNameProvider("reactive-template");
}
}
@Override
protected Query getTermsAggsQuery(String aggsName, String aggsField) {
return ELCQueries.getTermsAggsQuery(aggsName, aggsField);
}
@Override
public boolean usesNewElasticsearchClient() {
return true;
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return ELCQueries.getBuilderWithMatchAllQuery();
}
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value) {
return ELCQueries.getBuilderWithTermQuery(field, value);
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
return NativeQuery.builder() //
.withQuery(matchAllQueryAsQuery()) //
.withFieldCollapse(FieldCollapse.of(fc -> {
fc.field(collapseField);
if (innerHits != null) {
fc.innerHits(ih -> ih.name(innerHits).size(size));
}
return fc;
})).build();
}
@Override
protected Query queryWithIds(String... ids) {
return ELCQueries.queryWithIds(ids);
}
@Override
protected <A extends AggregationContainer<?>> void assertThatAggregationsAreCorrect(A aggregationContainer) {
Aggregation aggregation = ((ElasticsearchAggregation) aggregationContainer).aggregation();
assertThat(aggregation.getName()).isEqualTo("messages");
Aggregate aggregate = aggregation.getAggregate();
assertThat(aggregate.isSterms()).isTrue();
StringTermsAggregate parsedStringTerms = (StringTermsAggregate) aggregate.sterms();
Buckets<StringTermsBucket> buckets = parsedStringTerms.buckets();
assertThat(buckets.isArray()).isTrue();
List<StringTermsBucket> bucketList = buckets.array();
assertThat(bucketList.size()).isEqualTo(3);
AtomicInteger count = new AtomicInteger();
bucketList.forEach(stringTermsBucket -> {
if ("message".equals(stringTermsBucket.key())) {
count.getAndIncrement();
assertThat(stringTermsBucket.docCount()).isEqualTo(3);
}
if ("some".equals(stringTermsBucket.key())) {
count.getAndIncrement();
assertThat(stringTermsBucket.docCount()).isEqualTo(2);
}
if ("other".equals(stringTermsBucket.key())) {
count.getAndIncrement();
assertThat(stringTermsBucket.docCount()).isEqualTo(1);
}
});
assertThat(count.get()).isEqualTo(3);
}
}

View File

@ -15,11 +15,23 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.core.query.BaseQueryBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
@ -37,4 +49,55 @@ public class ReactiveElasticsearchERHLCIntegrationTests extends ReactiveElastics
return new IndexNameProvider("reactive-template-es7");
}
}
@Override
protected Query getTermsAggsQuery(String aggsName, String aggsField) {
return new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.addAggregation(AggregationBuilders.terms("messages").field("message")).build();
}
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery() {
return new NativeSearchQueryBuilder().withQuery(matchAllQuery());
}
@Override
protected BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value) {
return new NativeSearchQueryBuilder().withQuery(termQuery(field, value));
}
@Override
protected Query getQueryWithCollapse(String collapseField, @Nullable String innerHits, @Nullable Integer size) {
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseField);
if (innerHits != null) {
InnerHitBuilder innerHitBuilder = new InnerHitBuilder(innerHits);
if (size != null) {
innerHitBuilder.setSize(size);
}
collapseBuilder.setInnerHits(innerHitBuilder);
}
return new NativeSearchQueryBuilder().withQuery(matchAllQuery()).withCollapseBuilder(collapseBuilder).build();
}
@Override
protected Query queryWithIds(String... ids) {
return new NativeSearchQueryBuilder().withIds(ids).build();
}
@Override
protected <A extends AggregationContainer<?>> void assertThatAggregationsAreCorrect(A aggregationContainer) {
Aggregation aggregation = (Aggregation) aggregationContainer.aggregation();
assertThat(aggregation.getName()).isEqualTo("messages");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("message").getDocCount()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("some").getDocCount()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("other").getDocCount()).isEqualTo(1);
}
}

View File

@ -24,9 +24,9 @@ import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.lang.Boolean;
import java.lang.Integer;
import java.lang.Long;
import java.lang.Object;
import java.net.ConnectException;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@ -40,36 +40,26 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.json.JSONException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIf;
import org.skyscreamer.jsonassert.JSONAssert;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.NewElasticsearchClientDevelopment;
import org.springframework.data.elasticsearch.RestStatusException;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.document.Explanation;
import org.springframework.data.elasticsearch.core.index.AliasAction;
import org.springframework.data.elasticsearch.core.index.AliasActionParameters;
@ -98,7 +88,7 @@ import org.springframework.util.StringUtils;
*/
@SuppressWarnings("SpringJavaAutowiredMembersInspection")
@SpringIntegrationTest
public abstract class ReactiveElasticsearchIntegrationTests implements NewElasticsearchClientDevelopment {
public abstract class ReactiveElasticsearchIntegrationTests {
@Autowired private ReactiveElasticsearchOperations operations;
@Autowired private IndexNameProvider indexNameProvider;
@ -118,28 +108,18 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}
// endregion
protected abstract Query getTermsAggsQuery(String aggsName, String aggsField);
protected abstract BaseQueryBuilder<?, ?> getBuilderWithMatchAllQuery();
protected abstract BaseQueryBuilder<?, ?> getBuilderWithTermQuery(String field, String value);
protected abstract Query getQueryWithCollapse(String collapseField, @Nullable String innerHits,
@Nullable Integer size);
protected abstract Query queryWithIds(String... ids);
// region Tests
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void executeShouldProvideResource() {
Mono.from(operations.execute(ReactiveElasticsearchClient::ping)) //
.as(StepVerifier::create) //
.expectNext(true) //
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void executeShouldConvertExceptions() {
Mono.from(operations.execute(client -> {
throw new RuntimeException(new ConnectException("we're doomed"));
})) //
.as(StepVerifier::create) //
.expectError(DataAccessResourceFailureException.class) //
.verify();
}
@Test // DATAES-504
public void insertWithIdShouldWork() {
@ -157,7 +137,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void insertWithAutogeneratedIdShouldUpdateEntityId() {
@ -175,7 +154,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
return operations.exists(id, IndexCoordinates.of(index));
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void insertWithExplicitIndexNameShouldOverwriteMetadata() {
@ -290,7 +268,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-519
public void existsShouldReturnFalseWhenIndexDoesNotExist() {
@ -300,7 +277,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void existsShouldReturnTrueWhenFound() {
@ -313,7 +289,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void existsShouldReturnFalseWhenNotFound() {
@ -432,7 +407,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-595, DATAES-767
public void shouldThrowDataAccessExceptionWhenInvalidPreferenceForGivenCriteria() {
@ -445,6 +419,8 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
CriteriaQuery queryWithInvalidPreference = new CriteriaQuery(
new Criteria("message").contains("some").and("message").contains("message"));
queryWithInvalidPreference.setPreference("_only_nodes:oops");
// add a pageable to not use scrolling,otherwise the exception class does not match
queryWithInvalidPreference.setPageable(PageRequest.of(0, 10));
operations.search(queryWithInvalidPreference, SampleEntity.class) //
.as(StepVerifier::create) //
@ -498,7 +474,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-567
public void aggregateShouldReturnAggregations() {
@ -508,24 +483,17 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
index(sampleEntity1, sampleEntity2, sampleEntity3);
NativeSearchQuery query = new NativeSearchQueryBuilder().withQuery(matchAllQuery())
.addAggregation(AggregationBuilders.terms("messages").field("message")).build();
Query query = getTermsAggsQuery("messages", "message");
operations.aggregate(query, SampleEntity.class) //
.as(StepVerifier::create) //
.consumeNextWith(aggregationContainer -> {
Aggregation aggregation = ((ElasticsearchAggregation) aggregationContainer).aggregation();
assertThat(aggregation.getName()).isEqualTo("messages");
assertThat(aggregation instanceof ParsedStringTerms);
ParsedStringTerms parsedStringTerms = (ParsedStringTerms) aggregation;
assertThat(parsedStringTerms.getBuckets().size()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("message").getDocCount()).isEqualTo(3);
assertThat(parsedStringTerms.getBucketByKey("some").getDocCount()).isEqualTo(2);
assertThat(parsedStringTerms.getBucketByKey("other").getDocCount()).isEqualTo(1);
assertThatAggregationsAreCorrect(aggregationContainer);
}).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
protected abstract <A extends AggregationContainer<?>> void assertThatAggregationsAreCorrect(A aggregationContainer);
@Test // DATAES-567, DATAES-767
public void aggregateShouldErrorWhenIndexDoesNotExist() {
operations
@ -621,7 +589,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-519
public void deleteByQueryShouldReturnZeroWhenIndexDoesNotExist() {
@ -634,7 +601,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-547
public void shouldDeleteAcrossIndex() {
@ -650,11 +616,9 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).refresh().then(operations.indexOps(thatIndex).refresh()).block();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() //
.withQuery(termQuery("message", "test")) //
.build();
Query query = getBuilderWithTermQuery("message", "test").build();
operations.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
operations.delete(query, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(2L) //
@ -663,7 +627,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).delete().then(operations.indexOps(thatIndex).delete()).block();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-547
public void shouldDeleteAcrossIndexWhenNoMatchingDataPresent() {
@ -679,11 +642,9 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).refresh().then(operations.indexOps(thatIndex).refresh()).block();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() //
.withQuery(termQuery("message", "negative")) //
.build();
Query query = getBuilderWithTermQuery("message", "negative").build();
operations.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
operations.delete(query, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(0L) //
@ -692,7 +653,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.indexOps(thisIndex).delete().then(operations.indexOps(thatIndex).delete()).block();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void deleteByQueryShouldReturnNumberOfDeletedDocuments() {
@ -707,7 +667,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-504
public void deleteByQueryShouldReturnZeroIfNothingDeleted() {
@ -722,7 +681,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-593
public void shouldReturnDocumentWithCollapsedField() {
@ -734,28 +692,20 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
entity3.setRate(1);
index(entity1, entity2, entity3);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(matchAllQuery()) //
.withCollapseField("rate") //
.withPageable(PageRequest.of(0, 25)) //
.build();
Query query = getQueryWithCollapse("rate", null, null);
operations.search(query, SampleEntity.class, IndexCoordinates.of(indexNameProvider.indexName())) //
.as(StepVerifier::create) //
.expectNextCount(2) //
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test
void shouldReturnSortFields() {
SampleEntity entity = randomEntity("test message");
entity.rate = 42;
index(entity);
NativeSearchQuery query = new NativeSearchQueryBuilder() //
.withQuery(matchAllQuery()) //
.withSort(new FieldSortBuilder("rate").order(SortOrder.DESC)) //
Query query = getBuilderWithMatchAllQuery().withSort(Sort.by(Sort.Direction.DESC, "rate"))
.build();
operations.search(query, SampleEntity.class) //
@ -763,12 +713,23 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.consumeNextWith(it -> {
List<Object> sortValues = it.getSortValues();
assertThat(sortValues).hasSize(1);
assertThat(sortValues.get(0)).isEqualTo(42);
// old client returns Integer, new ElasticsearchClient String
java.lang.Object o = sortValues.get(0);
if (o instanceof Integer) {
Integer i = (Integer) o;
assertThat(o).isInstanceOf(Integer.class).isEqualTo(42);
} else if (o instanceof Long) {
Long l = (Long) o;
assertThat(o).isInstanceOf(Long.class).isEqualTo(42L);
} else if (o instanceof String) {
assertThat(o).isInstanceOf(String.class).isEqualTo("42");
} else {
fail("unexpected object type " + o);
}
}) //
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623, #1678
public void shouldReturnObjectsForGivenIdsUsingMultiGet() {
SampleEntity entity1 = randomEntity("test message 1");
@ -788,7 +749,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623
public void shouldReturnObjectsForGivenIdsUsingMultiGetWithFields() {
SampleEntity entity1 = randomEntity("test message 1");
@ -809,7 +769,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623. #1678
public void shouldDoBulkUpdate() {
SampleEntity entity1 = randomEntity("test message 1");
@ -847,7 +806,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-623
void shouldSaveAll() {
SampleEntity entity1 = randomEntity("test message 1");
@ -858,7 +816,7 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.saveAll(Mono.just(Arrays.asList(entity1, entity2)), IndexCoordinates.of(indexNameProvider.indexName())) //
.then().block();
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build();
Query searchQuery = operations.matchAllQuery();
operations.search(searchQuery, SampleEntity.class, IndexCoordinates.of(indexNameProvider.indexName())) //
.as(StepVerifier::create) //
.expectNextMatches(hit -> entity1.equals(hit.getContent()) || entity2.equals(hit.getContent())) //
@ -891,7 +849,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
assertThat(retrieved.seqNoPrimaryTerm.getPrimaryTerm()).isPositive();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799, #1678
void multiGetShouldReturnSeqNoPrimaryTerm() {
OptimisticEntity original = new OptimisticEntity();
@ -910,7 +867,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
return new NativeSearchQueryBuilder().withIds(singletonList(id)).build();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void searchShouldReturnSeqNoPrimaryTerm() {
OptimisticEntity original = new OptimisticEntity();
@ -927,10 +883,9 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}
private Query searchQueryForOne(String id) {
return new NativeSearchQueryBuilder().withFilter(new IdsQueryBuilder().addIds(id)).build();
return queryWithIds(id);
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnEntityWithSeqNoPrimaryTermProperty() {
OptimisticEntity original = new OptimisticEntity();
@ -950,7 +905,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
.verify();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-799
void shouldThrowOptimisticLockingFailureExceptionWhenConcurrentUpdateOccursOnVersionedEntityWithSeqNoPrimaryTermProperty() {
OptimisticAndVersionedEntity original = new OptimisticAndVersionedEntity();
@ -979,7 +933,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
operations.save(forEdit).as(StepVerifier::create).expectNextCount(1).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // DATAES-909
void shouldDoUpdate() {
SampleEntity entity = randomEntity("test message");
@ -1119,7 +1072,6 @@ public abstract class ReactiveElasticsearchIntegrationTests implements NewElasti
}).verifyComplete();
}
@DisabledIf("newElasticsearchClient") // todo #1973 still needs implementation
@Test // #1646, #1718
@DisplayName("should return a list of info for specific index")
void shouldReturnInformationListOfAllIndices() {

View File

@ -85,7 +85,7 @@ public class ReactiveElasticsearchTemplateUnitTests {
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.IMMEDIATE);
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.NONE);
}
@Test // DATAES-504
@ -170,7 +170,7 @@ public class ReactiveElasticsearchTemplateUnitTests {
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.IMMEDIATE);
assertThat(captor.getValue().getRefreshPolicy()).isEqualTo(WriteRequest.RefreshPolicy.NONE);
}
@Test // DATAES-504
@ -198,7 +198,7 @@ public class ReactiveElasticsearchTemplateUnitTests {
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().isRefresh()).isTrue();
assertThat(captor.getValue().isRefresh()).isFalse();
}
@Test // DATAES-504
@ -207,13 +207,13 @@ public class ReactiveElasticsearchTemplateUnitTests {
ArgumentCaptor<DeleteByQueryRequest> captor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
when(client.deleteBy(captor.capture())).thenReturn(Mono.empty());
template.setRefreshPolicy(RefreshPolicy.NONE);
template.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
template.delete(new StringQuery(QueryBuilders.matchAllQuery().toString()), Object.class, index) //
.as(StepVerifier::create) //
.verifyComplete();
assertThat(captor.getValue().isRefresh()).isFalse();
assertThat(captor.getValue().isRefresh()).isTrue();
}
@Test // DATAES-504

View File

@ -116,6 +116,30 @@ class RequestFactoryTests {
assertThat(searchRequest.source().from()).isEqualTo(30);
}
@Test // DATAES-227
public void shouldUseUpsertOnUpdate() {
// given
Map<String, Object> doc = new HashMap<>();
doc.put("id", "1");
doc.put("message", "test");
org.springframework.data.elasticsearch.core.document.Document document = org.springframework.data.elasticsearch.core.document.Document
.from(doc);
UpdateQuery updateQuery = UpdateQuery.builder("1") //
.withDocument(document) //
.withUpsert(document) //
.build();
// when
UpdateRequest request = requestFactory.updateRequest(updateQuery, IndexCoordinates.of("index"));
// then
assertThat(request).isNotNull();
assertThat(request.upsertRequest()).isNotNull();
}
@Test // DATAES-693
public void shouldReturnSourceWhenRequested() {
// given

View File

@ -21,11 +21,15 @@ import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.aggregations.StatsBucketAggregate;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.ELCQueries;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregation;
import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregations;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.QueryBuilders;
@ -55,18 +59,17 @@ public class AggregationELCIntegrationTests extends AggregationIntegrationTests
@Override
protected Query getTermsAggsQuery(String aggsName, String aggsField) {
return NativeQuery.builder() //
.withQuery(QueryBuilders.matchAllQueryAsQuery()) //
.withAggregation(aggsName, Aggregation.of(a -> a //
.terms(ta -> ta.field(aggsField)))) //
.withMaxResults(0) //
.build();
return ELCQueries.getTermsAggsQuery(aggsName, aggsField);
}
@Override
protected void assertThatAggsHasResult(AggregationsContainer<?> aggregationsContainer, String aggsName) {
Map<String, Aggregate> aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
assertThat(aggregations).containsKey(aggsName);
List<ElasticsearchAggregation> aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
List<String> aggNames = aggregations.stream() //
.map(ElasticsearchAggregation::aggregation) //
.map(org.springframework.data.elasticsearch.client.elc.Aggregation::getName) //
.collect(Collectors.toList());
assertThat(aggNames).contains(aggsName);
}
@Override
@ -84,9 +87,13 @@ public class AggregationELCIntegrationTests extends AggregationIntegrationTests
@Override
protected void assertThatPipelineAggsAreCorrect(AggregationsContainer<?> aggregationsContainer, String aggsName,
String pipelineAggsName) {
Map<String, Aggregate> aggregations = ((ElasticsearchAggregations) aggregationsContainer).aggregations();
assertThat(aggregations).containsKey(aggsName);
Aggregate aggregate = aggregations.get(pipelineAggsName);
Map<String, Aggregate> aggregates = ((ElasticsearchAggregations) aggregationsContainer).aggregations().stream() //
.map(ElasticsearchAggregation::aggregation) //
.collect(Collectors.toMap(org.springframework.data.elasticsearch.client.elc.Aggregation::getName,
org.springframework.data.elasticsearch.client.elc.Aggregation::getAggregate));
assertThat(aggregates).containsKey(aggsName);
Aggregate aggregate = aggregates.get(pipelineAggsName);
assertThat(aggregate.isStatsBucket()).isTrue();
StatsBucketAggregate statsBucketAggregate = aggregate.statsBucket();
assertThat(statsBucketAggregate.min()).isEqualTo(1.0);

View File

@ -234,7 +234,7 @@ public abstract class AggregationIntegrationTests {
*/
static class ArticleEntityBuilder {
private ArticleEntity result;
private final ArticleEntity result;
public ArticleEntityBuilder(String id) {
result = new ArticleEntity(id);

View File

@ -116,7 +116,7 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie
operations.bulkIndex(indexQueries, AnnotatedCompletionEntity.class);
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test
public void shouldFindSuggestionsForGivenCriteriaQueryUsingCompletionEntity() {
@ -148,7 +148,7 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie
operations.get("1", CompletionEntity.class);
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test
public void shouldFindSuggestionsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {
@ -172,7 +172,7 @@ public abstract class CompletionIntegrationTests implements NewElasticsearchClie
assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin");
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test
public void shouldFindSuggestionsWithWeightsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {

View File

@ -67,7 +67,7 @@ public abstract class ReactiveSuggestIntegrationTests implements NewElasticsearc
operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete().block();
}
@DisabledIf("newElasticsearchClient") // todo #1973, ES issue 150
@DisabledIf(value = "newElasticsearchClient", disabledReason="todo #1973, ES issue 150")
@Test // #1302
@DisplayName("should find suggestions for given prefix completion")
void shouldFindSuggestionsForGivenPrefixCompletion() {

View File

@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
@ContextConfiguration(classes = {ElasticsearchRepositoryELCIntegrationTests.Config.class })
public class ElasticsearchRepositoryELCIntegrationTests extends ElasticsearchRepositoryIntegrationTests {
@Configuration
@Import({ElasticsearchTemplateConfiguration.class })
@EnableElasticsearchRepositories(basePackages = {"org.springframework.data.elasticsearch.repository.support" },
considerNestedRepositories = true)
static class Config {
@Bean
IndexNameProvider indexNameProvider() {
return new IndexNameProvider("repository");
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.repository.support;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
* @since 4.4
*/
@ContextConfiguration(classes = {ElasticsearchRepositoryERHLCIntegrationTests.Config.class })
public class ElasticsearchRepositoryERHLCIntegrationTests extends ElasticsearchRepositoryIntegrationTests {
@Configuration
@Import({ElasticsearchRestTemplateConfiguration.class })
@EnableElasticsearchRepositories(basePackages = {"org.springframework.data.elasticsearch.repository.support" },
considerNestedRepositories = true)
static class Config {
@Bean
IndexNameProvider indexNameProvider() {
return new IndexNameProvider("repository-es7");
}
}
}

View File

@ -17,12 +17,9 @@ package org.springframework.data.elasticsearch.repository.support;
import static org.assertj.core.api.Assertions.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.springframework.data.elasticsearch.annotations.FieldType.*;
import static org.springframework.data.elasticsearch.utils.IdGenerator.*;
import java.io.IOException;
import java.lang.Long;
import java.lang.Object;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -30,12 +27,10 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Version;
@ -45,18 +40,16 @@ import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.elasticsearch.utils.IndexInitializer;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.data.util.StreamUtils;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Rizwan Idrees
@ -68,29 +61,22 @@ import org.springframework.test.context.ContextConfiguration;
* @author Murali Chevuri
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { SimpleElasticsearchRepositoryIntegrationTests.Config.class })
class SimpleElasticsearchRepositoryIntegrationTests {
@Configuration
@Import({ ElasticsearchRestTemplateConfiguration.class })
@EnableElasticsearchRepositories(basePackages = { "org.springframework.data.elasticsearch.repository.support" },
considerNestedRepositories = true)
static class Config {}
abstract class ElasticsearchRepositoryIntegrationTests {
@Autowired private SampleElasticsearchRepository repository;
@Autowired private ElasticsearchOperations operations;
private IndexOperations indexOperations;
@Autowired private IndexNameProvider indexNameProvider;
@BeforeEach
void before() {
indexOperations = operations.indexOps(SampleEntity.class);
IndexInitializer.init(indexOperations);
indexNameProvider.increment();
operations.indexOps(SampleEntity.class).createWithMapping();
}
@AfterEach
void after() {
indexOperations.delete();
@Test
@org.junit.jupiter.api.Order(Integer.MAX_VALUE)
public void cleanup() {
operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete();
}
@Test
@ -369,7 +355,7 @@ class SimpleElasticsearchRepositoryIntegrationTests {
repository.deleteAllById(Arrays.asList(id1, id3));
// then
assertThat(repository.findAll()).extracting(SampleEntity::getId).containsExactly(id2);
Assertions.assertThat(repository.findAll()).extracting(SampleEntity::getId).containsExactly(id2);
}
@Test
@ -539,8 +525,8 @@ class SimpleElasticsearchRepositoryIntegrationTests {
repository.deleteAll(sampleEntities);
// then
assertThat(repository.findById(documentId1)).isNotPresent();
assertThat(repository.findById(documentId2)).isNotPresent();
Assertions.assertThat(repository.findById(documentId1)).isNotPresent();
Assertions.assertThat(repository.findById(documentId2)).isNotPresent();
}
@Test
@ -677,14 +663,14 @@ class SimpleElasticsearchRepositoryIntegrationTests {
return sampleEntities;
}
@Document(indexName = "test-index-sample-simple-repository")
@Document(indexName = "#{@indexNameProvider.indexName()}")
static class SampleEntity {
@Nullable
@Id private String id;
@Nullable
@Field(type = Text, store = true, fielddata = true) private String type;
@Field(type = FieldType.Text, store = true, fielddata = true) private String type;
@Nullable
@Field(type = Text, store = true, fielddata = true) private String message;
@Field(type = FieldType.Text, store = true, fielddata = true) private String message;
@Nullable private int rate;
@Nullable private boolean available;
@Nullable

View File

@ -67,6 +67,7 @@ import org.springframework.test.context.ContextConfiguration;
* @author Peter-Josef Meisch
* @author Jens Schauder
*/
// todo #1973 test for both clients
@SpringIntegrationTest
@ContextConfiguration(classes = { SimpleReactiveElasticsearchRepositoryTests.Config.class })
class SimpleReactiveElasticsearchRepositoryTests {

View File

@ -15,7 +15,7 @@
#
#
sde.testcontainers.image-name=docker.elastic.co/elasticsearch/elasticsearch
sde.testcontainers.image-version=7.17.1
sde.testcontainers.image-version=7.17.2
#
#
# needed as we do a DELETE /* at the end of the tests, will be required from 8.0 on, produces a warning since 7.13