Enhance refresh policy handling.

Original Pull Request #2725
Closes #2722
This commit is contained in:
Peter-Josef Meisch 2023-10-11 21:54:38 +02:00 committed by GitHub
parent 0b33d7fe57
commit 9f42ec965c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 344 additions and 15 deletions

View File

@ -12,6 +12,7 @@
* Enable MultiField annotation on property getter * Enable MultiField annotation on property getter
* Support nested sort option * Support nested sort option
* Improved scripted und runtime field support * Improved scripted und runtime field support
* Improved refresh policy support
[[new-features.5-1-0]] [[new-features.5-1-0]]
== New in Spring Data Elasticsearch 5.1 == New in Spring Data Elasticsearch 5.1

View File

@ -855,7 +855,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
} }
// endregion // endregion
// region routing // region customization
private void setRoutingResolver(RoutingResolver routingResolver) { private void setRoutingResolver(RoutingResolver routingResolver) {
Assert.notNull(routingResolver, "routingResolver must not be null"); Assert.notNull(routingResolver, "routingResolver must not be null");
@ -873,5 +873,13 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
return copy; return copy;
} }
@Override
public ElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
var copy = copy();
copy.setRefreshPolicy(refreshPolicy);
return copy;
}
// endregion // endregion
} }

View File

@ -186,7 +186,7 @@ abstract public class AbstractReactiveElasticsearchTemplate
// endregion // endregion
// region routing // region customizations
private void setRoutingResolver(RoutingResolver routingResolver) { private void setRoutingResolver(RoutingResolver routingResolver) {
Assert.notNull(routingResolver, "routingResolver must not be null"); Assert.notNull(routingResolver, "routingResolver must not be null");
@ -203,6 +203,14 @@ abstract public class AbstractReactiveElasticsearchTemplate
copy.setRoutingResolver(routingResolver); copy.setRoutingResolver(routingResolver);
return copy; return copy;
} }
@Override
public ReactiveElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
AbstractReactiveElasticsearchTemplate copy = copy();
copy.setRefreshPolicy(refreshPolicy);
return copy;
}
// endregion // endregion
// region DocumentOperations // region DocumentOperations

View File

@ -15,8 +15,6 @@
*/ */
package org.springframework.data.elasticsearch.core; package org.springframework.data.elasticsearch.core;
import java.util.Objects;
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations; import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -91,15 +89,24 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera
} }
// endregion // endregion
// region routing // region customizations
/** /**
* Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to * Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to
* obtain routing information. * obtain routing information.
* *
* @param routingResolver the {@link RoutingResolver} value, must not be {@literal null}. * @param routingResolver the {@link RoutingResolver} value, must not be {@literal null}.
* @return DocumentOperations instance * @return {@link ElasticsearchOperations} instance
* @since 4.2 * @since 4.2
*/ */
ElasticsearchOperations withRouting(RoutingResolver routingResolver); ElasticsearchOperations withRouting(RoutingResolver routingResolver);
/**
* Returns a copy of this instance with the same configuration, but that uses a different {@link RefreshPolicy}.
*
* @param refreshPolicy the {@link RefreshPolicy} value.
* @return {@link ElasticsearchOperations} instance.
* @since 5.2
*/
ElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy);
// endregion // endregion
} }

View File

@ -77,7 +77,7 @@ public interface ReactiveElasticsearchOperations
*/ */
ReactiveClusterOperations cluster(); ReactiveClusterOperations cluster();
// region routing // region customizations
/** /**
* Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to * Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to
* obtain routing information. * obtain routing information.
@ -86,5 +86,14 @@ public interface ReactiveElasticsearchOperations
* @return DocumentOperations instance * @return DocumentOperations instance
*/ */
ReactiveElasticsearchOperations withRouting(RoutingResolver routingResolver); ReactiveElasticsearchOperations withRouting(RoutingResolver routingResolver);
/**
* Returns a copy of this instance with the same configuration, but that uses a different {@link RefreshPolicy}.
*
* @param refreshPolicy the {@link RefreshPolicy} value.
* @return {@link ReactiveElasticsearchOperations} instance.
* @since 5.2
*/
ReactiveElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy);
// endregion // endregion
} }

View File

@ -17,6 +17,7 @@ package org.springframework.data.elasticsearch.repository;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.NoRepositoryBean; import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.PagingAndSortingRepository; import org.springframework.data.repository.PagingAndSortingRepository;
@ -31,6 +32,7 @@ import org.springframework.lang.Nullable;
* @author Murali Chevuri * @author Murali Chevuri
* @author Peter-Josef Meisch * @author Peter-Josef Meisch
*/ */
@SuppressWarnings("unused")
@NoRepositoryBean @NoRepositoryBean
public interface ElasticsearchRepository<T, ID> extends PagingAndSortingRepository<T, ID>, CrudRepository<T, ID> { public interface ElasticsearchRepository<T, ID> extends PagingAndSortingRepository<T, ID>, CrudRepository<T, ID> {
@ -43,4 +45,39 @@ public interface ElasticsearchRepository<T, ID> extends PagingAndSortingReposito
* @return * @return
*/ */
Page<T> searchSimilar(T entity, @Nullable String[] fields, Pageable pageable); Page<T> searchSimilar(T entity, @Nullable String[] fields, Pageable pageable);
/**
* @since 5.2
*/
<S extends T> S save(S entity, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
<S extends T> Iterable<S> saveAll(Iterable<S> entities, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
void deleteById(ID id, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
void delete(T entity, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
void deleteAllById(Iterable<? extends ID> ids, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
void deleteAll(Iterable<? extends T> entities, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
void deleteAll(@Nullable RefreshPolicy refreshPolicy);
} }

View File

@ -1,7 +1,7 @@
/* /*
* Copyright 2019-2023 the original author or authors. * Copyright 2019-2023 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License", @Nullable RefreshPolicy refreshPolicy);
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
@ -15,9 +15,15 @@
*/ */
package org.springframework.data.elasticsearch.repository; package org.springframework.data.elasticsearch.repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.core.RefreshPolicy;
import org.springframework.data.repository.NoRepositoryBean; import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.reactive.ReactiveCrudRepository; import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.reactive.ReactiveSortingRepository; import org.springframework.data.repository.reactive.ReactiveSortingRepository;
import org.springframework.lang.Nullable;
/** /**
* Elasticsearch specific {@link org.springframework.data.repository.Repository} interface with reactive support. * Elasticsearch specific {@link org.springframework.data.repository.Repository} interface with reactive support.
@ -25,6 +31,57 @@ import org.springframework.data.repository.reactive.ReactiveSortingRepository;
* @author Christoph Strobl * @author Christoph Strobl
* @since 3.2 * @since 3.2
*/ */
@SuppressWarnings("unused")
@NoRepositoryBean @NoRepositoryBean
public interface ReactiveElasticsearchRepository<T, ID> public interface ReactiveElasticsearchRepository<T, ID>
extends ReactiveSortingRepository<T, ID>, ReactiveCrudRepository<T, ID> {} extends ReactiveSortingRepository<T, ID>, ReactiveCrudRepository<T, ID> {
/**
* @since 5.2
*/
<S extends T> Mono<S> save(S entity, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
<S extends T> Flux<S> saveAll(Iterable<S> entities, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
<S extends T> Flux<S> saveAll(Publisher<S> entityStream, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
Mono<Void> deleteById(ID id, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
Mono<Void> deleteById(Publisher<ID> id, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
Mono<Void> delete(T entity, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
Mono<Void> deleteAllById(Iterable<? extends ID> ids, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
Mono<Void> deleteAll(Iterable<? extends T> entities, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
Mono<Void> deleteAll(Publisher<? extends T> entityStream, @Nullable RefreshPolicy refreshPolicy);
/**
* @since 5.2
*/
Mono<Void> deleteAll(@Nullable RefreshPolicy refreshPolicy);
}

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.domain.Page; import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.PageRequest;
@ -177,10 +178,19 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
Assert.notNull(entity, "Cannot save 'null' entity."); Assert.notNull(entity, "Cannot save 'null' entity.");
// noinspection ConstantConditions // noinspection DataFlowIssue
return executeAndRefresh(operations -> operations.save(entity, getIndexCoordinates())); return executeAndRefresh(operations -> operations.save(entity, getIndexCoordinates()));
} }
@Override
public <S extends T> S save(S entity, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entity, "entity must not be null");
// noinspection DataFlowIssue
return executeAndRefresh(operations -> operations.save(entity, getIndexCoordinates()), refreshPolicy);
}
public <S extends T> List<S> save(List<S> entities) { public <S extends T> List<S> save(List<S> entities) {
Assert.notNull(entities, "Cannot insert 'null' as a List."); Assert.notNull(entities, "Cannot insert 'null' as a List.");
@ -188,6 +198,13 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
return Streamable.of(saveAll(entities)).stream().collect(Collectors.toList()); return Streamable.of(saveAll(entities)).stream().collect(Collectors.toList());
} }
public <S extends T> List<S> save(List<S> entities, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entities, "Cannot insert 'null' as a List.");
return Streamable.of(saveAll(entities, refreshPolicy)).stream().collect(Collectors.toList());
}
@Override @Override
public <S extends T> Iterable<S> saveAll(Iterable<S> entities) { public <S extends T> Iterable<S> saveAll(Iterable<S> entities) {
@ -199,6 +216,16 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
return entities; return entities;
} }
@Override
public <S extends T> Iterable<S> saveAll(Iterable<S> entities, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entities, "Cannot insert 'null' as a List.");
IndexCoordinates indexCoordinates = getIndexCoordinates();
executeAndRefresh(operations -> operations.save(entities, indexCoordinates), refreshPolicy);
return entities;
}
@Override @Override
public boolean existsById(ID id) { public boolean existsById(ID id) {
// noinspection ConstantConditions // noinspection ConstantConditions
@ -233,6 +260,14 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
doDelete(id, getIndexCoordinates()); doDelete(id, getIndexCoordinates());
} }
@Override
public void deleteById(ID id, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(id, "Cannot delete entity with id 'null'.");
doDelete(id, getIndexCoordinates(), refreshPolicy);
}
@Override @Override
public void delete(T entity) { public void delete(T entity) {
@ -241,9 +276,40 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
doDelete(extractIdFromBean(entity), getIndexCoordinates()); doDelete(extractIdFromBean(entity), getIndexCoordinates());
} }
@Override
public void delete(T entity, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entity, "Cannot delete 'null' entity.");
doDelete(extractIdFromBean(entity), getIndexCoordinates(), refreshPolicy);
}
@Override @Override
public void deleteAllById(Iterable<? extends ID> ids) { public void deleteAllById(Iterable<? extends ID> ids) {
// noinspection DuplicatedCode
Assert.notNull(ids, "Cannot delete 'null' list.");
List<String> idStrings = new ArrayList<>();
for (ID id : ids) {
idStrings.add(stringIdRepresentation(id));
}
if (idStrings.isEmpty()) {
return;
}
Query query = operations.idsQuery(idStrings);
executeAndRefresh((OperationsCallback<Void>) operations -> {
operations.delete(query, entityClass, getIndexCoordinates());
return null;
});
}
@Override
public void deleteAllById(Iterable<? extends ID> ids, @Nullable RefreshPolicy refreshPolicy) {
// noinspection DuplicatedCode
Assert.notNull(ids, "Cannot delete 'null' list."); Assert.notNull(ids, "Cannot delete 'null' list.");
List<String> idStrings = new ArrayList<>(); List<String> idStrings = new ArrayList<>();
@ -264,7 +330,16 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
@Override @Override
public void deleteAll(Iterable<? extends T> entities) { public void deleteAll(Iterable<? extends T> entities) {
deleteAllById(getEntityIds(entities));
}
@Override
public void deleteAll(Iterable<? extends T> entities, @Nullable RefreshPolicy refreshPolicy) {
deleteAllById(getEntityIds(entities), refreshPolicy);
}
@NotNull
private List<ID> getEntityIds(Iterable<? extends T> entities) {
Assert.notNull(entities, "Cannot delete 'null' list."); Assert.notNull(entities, "Cannot delete 'null' list.");
List<ID> ids = new ArrayList<>(); List<ID> ids = new ArrayList<>();
@ -274,8 +349,7 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
ids.add(id); ids.add(id);
} }
} }
return ids;
deleteAllById(ids);
} }
private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates) { private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates) {
@ -285,16 +359,30 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
} }
} }
private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates, @Nullable RefreshPolicy refreshPolicy) {
if (id != null) {
executeAndRefresh(operations -> operations.delete(stringIdRepresentation(id), indexCoordinates), refreshPolicy);
}
}
@Override @Override
public void deleteAll() { public void deleteAll() {
IndexCoordinates indexCoordinates = getIndexCoordinates();
executeAndRefresh((OperationsCallback<Void>) operations -> { executeAndRefresh((OperationsCallback<Void>) operations -> {
operations.delete(Query.findAll(), entityClass, indexCoordinates); operations.delete(Query.findAll(), entityClass, getIndexCoordinates());
return null; return null;
}); });
} }
@Override
public void deleteAll(@Nullable RefreshPolicy refreshPolicy) {
executeAndRefresh((OperationsCallback<Void>) operations -> {
operations.delete(Query.findAll(), entityClass, getIndexCoordinates());
return null;
}, refreshPolicy);
}
private void doRefresh() { private void doRefresh() {
RefreshPolicy refreshPolicy = null; RefreshPolicy refreshPolicy = null;
@ -352,5 +440,12 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
doRefresh(); doRefresh();
return result; return result;
} }
@Nullable
public <R> R executeAndRefresh(OperationsCallback<R> callback, @Nullable RefreshPolicy refreshPolicy) {
R result = callback.doWithOperations(operations.withRefreshPolicy(refreshPolicy));
doRefresh();
return result;
}
// endregion // endregion
} }

View File

@ -34,6 +34,7 @@ import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BaseQuery; import org.springframework.data.elasticsearch.core.query.BaseQuery;
import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository; import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
@ -96,13 +97,31 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
.flatMap(saved -> doRefresh().thenReturn(saved)); .flatMap(saved -> doRefresh().thenReturn(saved));
} }
@Override
public <S extends T> Mono<S> save(S entity, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entity, "Entity must not be null!");
return operations.withRefreshPolicy(refreshPolicy).save(entity, entityInformation.getIndexCoordinates())
.flatMap(saved -> doRefresh().thenReturn(saved));
}
@Override @Override
public <S extends T> Flux<S> saveAll(Iterable<S> entities) { public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
Assert.notNull(entities, "Entities must not be null!"); Assert.notNull(entities, "Entities must not be null!");
return saveAll(Flux.fromIterable(entities)); return saveAll(Flux.fromIterable(entities));
} }
@Override
public <S extends T> Flux<S> saveAll(Iterable<S> entities, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entities, "Entities must not be null!");
return saveAll(Flux.fromIterable(entities), refreshPolicy);
}
@Override @Override
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) { public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
@ -112,6 +131,16 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
.concatWith(doRefresh().then(Mono.empty())); .concatWith(doRefresh().then(Mono.empty()));
} }
@Override
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entityStream, "EntityStream must not be null!");
return operations.withRefreshPolicy(refreshPolicy)
.save(Flux.from(entityStream), entityInformation.getIndexCoordinates())
.concatWith(doRefresh().then(Mono.empty()));
}
@Override @Override
public Mono<T> findById(ID id) { public Mono<T> findById(ID id) {
@ -192,25 +221,54 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
public Mono<Void> deleteById(ID id) { public Mono<Void> deleteById(ID id) {
Assert.notNull(id, "Id must not be null!"); Assert.notNull(id, "Id must not be null!");
return operations.delete(convertId(id), entityInformation.getIndexCoordinates()) // return operations.delete(convertId(id), entityInformation.getIndexCoordinates()) //
.then(doRefresh()); .then(doRefresh());
} }
@Override
public Mono<Void> deleteById(ID id, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(id, "Id must not be null!");
return operations.withRefreshPolicy(refreshPolicy).delete(convertId(id), entityInformation.getIndexCoordinates()) //
.then(doRefresh());
}
@Override @Override
public Mono<Void> deleteById(Publisher<ID> id) { public Mono<Void> deleteById(Publisher<ID> id) {
Assert.notNull(id, "Id must not be null!"); Assert.notNull(id, "Id must not be null!");
return Mono.from(id).flatMap(this::deleteById); return Mono.from(id).flatMap(this::deleteById);
} }
@Override
public Mono<Void> deleteById(Publisher<ID> id, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(id, "Id must not be null!");
return Mono.from(id).flatMap(id2 -> deleteById(id, refreshPolicy));
}
@Override @Override
public Mono<Void> delete(T entity) { public Mono<Void> delete(T entity) {
Assert.notNull(entity, "Entity must not be null!"); Assert.notNull(entity, "Entity must not be null!");
return operations.delete(entity, entityInformation.getIndexCoordinates()) // return operations.delete(entity, entityInformation.getIndexCoordinates()) //
.then(doRefresh()); .then(doRefresh());
} }
@Override
public Mono<Void> delete(T entity, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entity, "Entity must not be null!");
return operations.withRefreshPolicy(refreshPolicy).delete(entity, entityInformation.getIndexCoordinates()) //
.then(doRefresh());
}
@Override @Override
public Mono<Void> deleteAllById(Iterable<? extends ID> ids) { public Mono<Void> deleteAllById(Iterable<? extends ID> ids) {
@ -225,17 +283,43 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
.then(doRefresh()); .then(doRefresh());
} }
@Override
public Mono<Void> deleteAllById(Iterable<? extends ID> ids, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(ids, "Ids must not be null!");
var operationsWithRefreshPolicy = operations.withRefreshPolicy(refreshPolicy);
return Flux.fromIterable(ids) //
.map(this::convertId) //
.collectList() //
.map(operations::idsQuery) //
.flatMap(
query -> operationsWithRefreshPolicy.delete(query, entityInformation.getJavaType(),
entityInformation.getIndexCoordinates())) //
.then(doRefresh());
}
@Override @Override
public Mono<Void> deleteAll(Iterable<? extends T> entities) { public Mono<Void> deleteAll(Iterable<? extends T> entities) {
Assert.notNull(entities, "Entities must not be null!"); Assert.notNull(entities, "Entities must not be null!");
return deleteAll(Flux.fromIterable(entities)); return deleteAll(Flux.fromIterable(entities));
} }
@Override
public Mono<Void> deleteAll(Iterable<? extends T> entities, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entities, "Entities must not be null!");
return deleteAll(Flux.fromIterable(entities), refreshPolicy);
}
@Override @Override
public Mono<Void> deleteAll(Publisher<? extends T> entityStream) { public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
Assert.notNull(entityStream, "EntityStream must not be null!"); Assert.notNull(entityStream, "EntityStream must not be null!");
return Flux.from(entityStream) // return Flux.from(entityStream) //
.map(entityInformation::getRequiredId) // .map(entityInformation::getRequiredId) //
.map(this::convertId) // .map(this::convertId) //
@ -247,12 +331,35 @@ public class SimpleReactiveElasticsearchRepository<T, ID> implements ReactiveEla
} }
@Override @Override
public Mono<Void> deleteAll() { public Mono<Void> deleteAll(Publisher<? extends T> entityStream, @Nullable RefreshPolicy refreshPolicy) {
Assert.notNull(entityStream, "EntityStream must not be null!");
var operationsWithRefreshPolicy = operations.withRefreshPolicy(refreshPolicy);
return Flux.from(entityStream) //
.map(entityInformation::getRequiredId) //
.map(this::convertId) //
.collectList() //
.map(operations::idsQuery)
.flatMap(
query -> operationsWithRefreshPolicy.delete(query, entityInformation.getJavaType(),
entityInformation.getIndexCoordinates())) //
.then(doRefresh());
}
@Override
public Mono<Void> deleteAll() {
return operations.delete(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates()) // return operations.delete(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates()) //
.then(doRefresh()); .then(doRefresh());
} }
@Override
public Mono<Void> deleteAll(@Nullable RefreshPolicy refreshPolicy) {
return operations.withRefreshPolicy(refreshPolicy)
.delete(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates()) //
.then(doRefresh());
}
private String convertId(Object id) { private String convertId(Object id) {
return operations.getElasticsearchConverter().convertId(id); return operations.getElasticsearchConverter().convertId(id);
} }