DATAES-785 - Polishing.

This commit is contained in:
Peter-Josef Meisch 2020-04-21 21:57:31 +02:00
parent 7501c19be4
commit 60cbb67877
5 changed files with 23 additions and 32 deletions

View File

@ -593,7 +593,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
private final Class<T> type;
public ReadSearchScrollDocumentResponseCallback(Class<T> type, IndexCoordinates index) {
Assert.notNull(type, "type is null");
this.delegate = new ReadDocumentCallback<>(elasticsearchConverter, type, index);

View File

@ -194,8 +194,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
IndexResponse indexResponse = it.getT2();
AdaptibleEntity<T> adaptableEntity = operations.forEntity(savedEntity, converter.getConversionService());
return adaptableEntity.populateIdIfNecessary(indexResponse.getId());
})
.flatMap(saved -> maybeCallAfterSave(saved, index));
}).flatMap(saved -> maybeCallAfterSave(saved, index));
}
@Override
@ -208,31 +207,25 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(entitiesPublisher, "Entities must not be null!");
return entitiesPublisher
.flatMapMany(entities -> {
return Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index));
})
.collectList()
.map(Entities::new)
.flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return entitiesPublisher.flatMapMany(entities -> {
return Flux.fromIterable(entities) //
.concatMap(entity -> maybeCallBeforeConvert(entity, index));
}).collectList().map(Entities::new).flatMapMany(entities -> {
if (entities.isEmpty()) {
return Flux.empty();
}
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index) //
.index()
.flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkItemResponse bulkItemResponse = indexAndResponse.getT2();
return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index) //
.index().flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkItemResponse bulkItemResponse = indexAndResponse.getT2();
AdaptibleEntity<T> adaptibleEntity = operations.forEntity(savedEntity,
converter.getConversionService());
adaptibleEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
AdaptibleEntity<T> adaptibleEntity = operations.forEntity(savedEntity, converter.getConversionService());
adaptibleEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
return maybeCallAfterSave(savedEntity, index);
});
});
return maybeCallAfterSave(savedEntity, index);
});
});
}
@Override
@ -1017,9 +1010,7 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
}
private List<IndexQuery> indexQueries() {
return entities.stream()
.map(ReactiveElasticsearchTemplate.this::getIndexQuery)
.collect(Collectors.toList());
return entities.stream().map(ReactiveElasticsearchTemplate.this::getIndexQuery).collect(Collectors.toList());
}
private T entityAt(long index) {

View File

@ -15,12 +15,12 @@
*/
package org.springframework.data.elasticsearch.core.event;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.core.Ordered;
import org.springframework.data.auditing.IsNewAwareAuditingHandler;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.mapping.callback.EntityCallback;
import org.springframework.util.Assert;

View File

@ -201,8 +201,8 @@ public class ReactiveElasticsearchTemplateCallbackTests {
Person entity1 = new Person("init1", "luke1");
Person entity2 = new Person("init2", "luke2");
List<Person> saved = template.saveAll(Mono.just(Arrays.asList(entity1, entity2)), index)
.toStream().collect(Collectors.toList());
List<Person> saved = template.saveAll(Mono.just(Arrays.asList(entity1, entity2)), index).toStream()
.collect(Collectors.toList());
verify(afterSaveCallback, times(2)).onAfterSave(any(), eq(index));
assertThat(saved.get(0).firstname).isEqualTo("after-save");

View File

@ -18,7 +18,6 @@ package org.springframework.data.elasticsearch.core.event;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import reactor.test.StepVerifier;
import java.time.LocalDateTime;
@ -33,6 +32,7 @@ import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.LastModifiedDate;
import org.springframework.data.auditing.IsNewAwareAuditingHandler;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.mapping.context.PersistentEntities;
import org.springframework.lang.Nullable;