Improve delete-by-query returned information.

Original Pull Request #1692
Closes #1679
This commit is contained in:
Peter-Josef Meisch 2021-02-13 16:03:26 +01:00 committed by GitHub
parent ffc2420bcd
commit 3869fd2ee8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 83 additions and 65 deletions

View File

@ -47,3 +47,9 @@ If a refresh policy is set, then it will be used by the repositories as well.
When configuring Spring Data Elasticsearch like described in <<elasticsearch.clients>> by using `ElasticsearchConfigurationSupport`, `AbstractElasticsearchConfiguration` or `AbstractReactiveElasticsearchConfiguration` the refresh policy will be initialized to `null`.
Previously the reactive code initialized this to `IMMEDIATE`, now reactive and non-reactive code show the same behaviour.
=== Method return types
==== delete methods that take a Query
The reactive methods previously returned a `Mono<Long>` with the number of deleted documents, the non reactive versions were void. They now return a `Mono<ByQueryResponse>` which contains much more detailed information about the deleted documents and errors that might have occurred.

View File

@ -115,7 +115,7 @@ import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verif
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@ -524,10 +524,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch
}
@Override
public Mono<UpdateByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
public Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) //
.next() //
.map(UpdateByQueryResponse::of);
.map(ByQueryResponse::of);
}
/*

View File

@ -66,7 +66,7 @@ import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.suggest.Suggest;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@ -606,7 +606,7 @@ public interface ReactiveElasticsearchClient {
* * Query API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
default Mono<UpdateByQueryResponse> updateBy(Consumer<UpdateByQueryRequest> consumer) {
default Mono<ByQueryResponse> updateBy(Consumer<UpdateByQueryRequest> consumer) {
final UpdateByQueryRequest request = new UpdateByQueryRequest();
consumer.accept(request);
@ -621,7 +621,7 @@ public interface ReactiveElasticsearchClient {
* * Query API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
default Mono<UpdateByQueryResponse> updateBy(UpdateByQueryRequest updateRequest) {
default Mono<ByQueryResponse> updateBy(UpdateByQueryRequest updateRequest) {
return updateBy(HttpHeaders.EMPTY, updateRequest);
}
@ -634,7 +634,7 @@ public interface ReactiveElasticsearchClient {
* * Query API on elastic.co</a>
* @return a {@link Mono} emitting operation response.
*/
Mono<UpdateByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest);
Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest);
/**
* Execute a {@link BulkRequest} against the {@literal bulk} API.

View File

@ -53,6 +53,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersiste
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery;
@ -275,8 +276,8 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
}
@Override
public void delete(Query query, Class<?> clazz) {
delete(query, clazz, getIndexCoordinatesFor(clazz));
public ByQueryResponse delete(Query query, Class<?> clazz) {
return delete(query, clazz, getIndexCoordinatesFor(clazz));
}
@Override

View File

@ -19,9 +19,9 @@ import java.util.List;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
@ -276,19 +276,21 @@ public interface DocumentOperations {
* @param query query defining the objects
* @param clazz The entity class, must be annotated with
* {@link org.springframework.data.elasticsearch.annotations.Document}
* @return response with detailed information
* @since 4.1
*/
void delete(Query query, Class<?> clazz);
ByQueryResponse delete(Query query, Class<?> clazz);
/**
* Delete all records matching the query.
*
*
* @param query query defining the objects
* @param clazz The entity class, must be annotated with
* {@link org.springframework.data.elasticsearch.annotations.Document}
* @param index the index from which to delete
* @return response with detailed information
*/
void delete(Query query, Class<?> clazz, IndexCoordinates index);
ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates index);
/**
* Partial update of the document.
@ -307,5 +309,5 @@ public interface DocumentOperations {
* @return the update response
* @since 4.2
*/
UpdateByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
}

View File

@ -49,9 +49,9 @@ import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.support.SearchHitsUtil;
@ -208,9 +208,9 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
}
@Override
public void delete(Query query, Class<?> clazz, IndexCoordinates index) {
public ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates index) {
DeleteByQueryRequest deleteByQueryRequest = requestFactory.deleteByQueryRequest(query, clazz, index);
execute(client -> client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT));
return ByQueryResponse.of(execute(client -> client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT)));
}
@Override
@ -231,7 +231,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
}
@Override
public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");
@ -248,7 +248,7 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
final BulkByScrollResponse bulkByScrollResponse = execute(
client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT));
return UpdateByQueryResponse.of(bulkByScrollResponse);
return ByQueryResponse.of(bulkByScrollResponse);
}
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,

View File

@ -47,9 +47,9 @@ import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.support.SearchHitsUtil;
@ -232,8 +232,8 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
}
@Override
public void delete(Query query, Class<?> clazz, IndexCoordinates index) {
requestFactory.deleteByQueryRequestBuilder(client, query, clazz, index).get();
public ByQueryResponse delete(Query query, Class<?> clazz, IndexCoordinates index) {
return ByQueryResponse.of(requestFactory.deleteByQueryRequestBuilder(client, query, clazz, index).get());
}
@Override
@ -260,7 +260,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
}
@Override
public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");
@ -275,7 +275,7 @@ public class ElasticsearchTemplate extends AbstractElasticsearchTemplate {
// UpdateByQueryRequestBuilder has not parameters to set a routing value
final BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet();
return UpdateByQueryResponse.of(bulkByScrollResponse);
return ByQueryResponse.of(bulkByScrollResponse);
}
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,

View File

@ -24,8 +24,8 @@ import java.util.List;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.util.Assert;
@ -264,7 +264,7 @@ public interface ReactiveDocumentOperations {
* @param entityType must not be {@literal null}.
* @return a {@link Mono} emitting the number of the removed documents.
*/
Mono<Long> delete(Query query, Class<?> entityType);
Mono<ByQueryResponse> delete(Query query, Class<?> entityType);
/**
* Delete the documents matching the given {@link Query} extracting index from entity metadata.
@ -274,7 +274,7 @@ public interface ReactiveDocumentOperations {
* @param index the target index, must not be {@literal null}
* @return a {@link Mono} emitting the number of the removed documents.
*/
Mono<Long> delete(Query query, Class<?> entityType, IndexCoordinates index);
Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index);
/**
* Partial update of the document.
@ -294,5 +294,5 @@ public interface ReactiveDocumentOperations {
* @return a {@link Mono} emitting the update response
* @since 4.2
*/
Mono<UpdateByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
}

View File

@ -74,7 +74,7 @@ import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver;
@ -526,11 +526,11 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
* @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#delete(Query, Class, IndexCoordinates)
*/
@Override
public Mono<Long> delete(Query query, Class<?> entityType, IndexCoordinates index) {
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
Assert.notNull(query, "Query must not be null!");
return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).next();
return doDeleteBy(query, entityType, index).map(ByQueryResponse::of);
}
@Override
@ -556,7 +556,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
@Override
public Mono<UpdateByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
Assert.notNull(updateQuery, "updateQuery must not be null");
Assert.notNull(index, "Index must not be null");
@ -578,13 +578,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
@Override
public Mono<Long> delete(Query query, Class<?> entityType) {
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));
}
private Flux<BulkByScrollResponse> doDeleteBy(Query query, Class<?> entityType, IndexCoordinates index) {
private Mono<BulkByScrollResponse> doDeleteBy(Query query, Class<?> entityType, IndexCoordinates index) {
return Flux.defer(() -> {
return Mono.defer(() -> {
DeleteByQueryRequest request = requestFactory.deleteByQueryRequest(query, entityType, index);
return doDeleteBy(prepareDeleteByRequest(request));
});

View File

@ -29,7 +29,7 @@ import org.springframework.lang.Nullable;
* @author Farid Faoudi
* @since 4.2
*/
public class UpdateByQueryResponse {
public class ByQueryResponse {
private final long took;
private final boolean timedOut;
@ -44,7 +44,7 @@ public class UpdateByQueryResponse {
@Nullable private final String reasonCancelled;
private final List<Failure> failures;
private UpdateByQueryResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches,
private ByQueryResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches,
long versionConflicts, long noops, long bulkRetries, long searchRetries, @Nullable String reasonCancelled,
List<Failure> failures) {
this.took = took;
@ -149,21 +149,21 @@ public class UpdateByQueryResponse {
}
/**
* Create a new {@link UpdateByQueryResponseBuilder} to build {@link UpdateByQueryResponse}
* Create a new {@link ByQueryResponseBuilder} to build {@link ByQueryResponse}
*
* @return a new {@link UpdateByQueryResponseBuilder} to build {@link UpdateByQueryResponse}
* @return a new {@link ByQueryResponseBuilder} to build {@link ByQueryResponse}
*/
public static UpdateByQueryResponseBuilder builder() {
return new UpdateByQueryResponseBuilder();
public static ByQueryResponseBuilder builder() {
return new ByQueryResponseBuilder();
}
public static UpdateByQueryResponse of(BulkByScrollResponse bulkByScrollResponse) {
public static ByQueryResponse of(BulkByScrollResponse bulkByScrollResponse) {
final List<Failure> failures = bulkByScrollResponse.getBulkFailures() //
.stream() //
.map(Failure::of) //
.collect(Collectors.toList()); //
return UpdateByQueryResponse.builder() //
return ByQueryResponse.builder() //
.withTook(bulkByScrollResponse.getTook().getMillis()) //
.withTimedOut(bulkByScrollResponse.isTimedOut()) //
.withTotal(bulkByScrollResponse.getTotal()) //
@ -331,7 +331,7 @@ public class UpdateByQueryResponse {
}
}
public static final class UpdateByQueryResponseBuilder {
public static final class ByQueryResponseBuilder {
private long took;
private boolean timedOut;
private long total;
@ -345,71 +345,71 @@ public class UpdateByQueryResponse {
@Nullable private String reasonCancelled;
private List<Failure> failures = Collections.emptyList();
private UpdateByQueryResponseBuilder() {}
private ByQueryResponseBuilder() {}
public UpdateByQueryResponseBuilder withTook(long took) {
public ByQueryResponseBuilder withTook(long took) {
this.took = took;
return this;
}
public UpdateByQueryResponseBuilder withTimedOut(boolean timedOut) {
public ByQueryResponseBuilder withTimedOut(boolean timedOut) {
this.timedOut = timedOut;
return this;
}
public UpdateByQueryResponseBuilder withTotal(long total) {
public ByQueryResponseBuilder withTotal(long total) {
this.total = total;
return this;
}
public UpdateByQueryResponseBuilder withUpdated(long updated) {
public ByQueryResponseBuilder withUpdated(long updated) {
this.updated = updated;
return this;
}
public UpdateByQueryResponseBuilder withDeleted(long deleted) {
public ByQueryResponseBuilder withDeleted(long deleted) {
this.deleted = deleted;
return this;
}
public UpdateByQueryResponseBuilder withBatches(int batches) {
public ByQueryResponseBuilder withBatches(int batches) {
this.batches = batches;
return this;
}
public UpdateByQueryResponseBuilder withVersionConflicts(long versionConflicts) {
public ByQueryResponseBuilder withVersionConflicts(long versionConflicts) {
this.versionConflicts = versionConflicts;
return this;
}
public UpdateByQueryResponseBuilder withNoops(long noops) {
public ByQueryResponseBuilder withNoops(long noops) {
this.noops = noops;
return this;
}
public UpdateByQueryResponseBuilder withBulkRetries(long bulkRetries) {
public ByQueryResponseBuilder withBulkRetries(long bulkRetries) {
this.bulkRetries = bulkRetries;
return this;
}
public UpdateByQueryResponseBuilder withSearchRetries(long searchRetries) {
public ByQueryResponseBuilder withSearchRetries(long searchRetries) {
this.searchRetries = searchRetries;
return this;
}
public UpdateByQueryResponseBuilder withReasonCancelled(String reasonCancelled) {
public ByQueryResponseBuilder withReasonCancelled(String reasonCancelled) {
this.reasonCancelled = reasonCancelled;
return this;
}
public UpdateByQueryResponseBuilder withFailures(List<Failure> failures) {
public ByQueryResponseBuilder withFailures(List<Failure> failures) {
this.failures = failures;
return this;
}
public UpdateByQueryResponse build() {
return new UpdateByQueryResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops,
bulkRetries, searchRetries, reasonCancelled, failures);
public ByQueryResponse build() {
return new ByQueryResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries,
searchRetries, reasonCancelled, failures);
}
}
}

View File

@ -25,6 +25,7 @@ import org.springframework.data.elasticsearch.core.SearchHitSupport;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchQueryExecution.ResultProcessingConverter;
import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchQueryExecution.ResultProcessingExecution;
@ -115,7 +116,8 @@ abstract class AbstractReactiveElasticsearchRepositoryQuery implements Repositor
ReactiveElasticsearchOperations operations) {
if (isDeleteQuery()) {
return (query, type, targetType, indexCoordinates) -> operations.delete(query, type, indexCoordinates);
return (query, type, targetType, indexCoordinates) -> operations.delete(query, type, indexCoordinates)
.map(ByQueryResponse::getDeleted);
} else if (isCountQuery()) {
return (query, type, targetType, indexCoordinates) -> operations.count(query, type, indexCoordinates);
} else if (isExistsQuery()) {

View File

@ -67,7 +67,7 @@ import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperatio
import org.springframework.data.elasticsearch.core.ReactiveIndexOperations;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.http.HttpHeaders;
@ -478,7 +478,7 @@ public class ReactiveElasticsearchClientIntegrationTests {
.setScript(new Script(ScriptType.INLINE, "painless", script, params)); //
client.updateBy(request) //
.map(UpdateByQueryResponse::getUpdated) //
.map(ByQueryResponse::getUpdated) //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete(); //
@ -510,7 +510,7 @@ public class ReactiveElasticsearchClientIntegrationTests {
.setScript(new Script(ScriptType.INLINE, "painless", script, params)); //
client.updateBy(request) //
.map(UpdateByQueryResponse::getUpdated) //
.map(ByQueryResponse::getUpdated) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete(); //

View File

@ -630,7 +630,9 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
template.delete(query, SampleEntity.class) //
.as(StepVerifier::create) //
.expectNext(0L) //
.consumeNextWith(byQueryResponse -> {
assertThat(byQueryResponse.getDeleted()).isEqualTo(0L);
})
.verifyComplete();
}
@ -654,6 +656,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
.build();
template.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();
@ -681,6 +684,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
.build();
template.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();
@ -696,6 +700,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test"));
template.delete(query, SampleEntity.class) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(2L) //
.verifyComplete();
@ -709,6 +714,7 @@ public class ReactiveElasticsearchTemplateIntegrationTests {
CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("luke"));
template.delete(query, SampleEntity.class) //
.map(ByQueryResponse::getDeleted) //
.as(StepVerifier::create) //
.expectNext(0L) //
.verifyComplete();

View File

@ -23,6 +23,7 @@ import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;