DATAES-898 - Add join-type relevant parts to reactive calls.

Original PR: #504
This commit is contained in:
Peter-Josef Meisch 2020-08-13 18:05:49 +02:00 committed by GitHub
parent 7b1e4cc126
commit c8c6e7a646
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 107 deletions

View File

@ -46,7 +46,6 @@ import org.springframework.data.elasticsearch.core.document.SearchDocumentRespon
import org.springframework.data.elasticsearch.core.event.AfterConvertCallback; import org.springframework.data.elasticsearch.core.event.AfterConvertCallback;
import org.springframework.data.elasticsearch.core.event.AfterSaveCallback; import org.springframework.data.elasticsearch.core.event.AfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.BeforeConvertCallback; import org.springframework.data.elasticsearch.core.event.BeforeConvertCallback;
import org.springframework.data.elasticsearch.core.join.JoinField;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -79,10 +78,10 @@ import org.springframework.util.Assert;
*/ */
public abstract class AbstractElasticsearchTemplate implements ElasticsearchOperations, ApplicationContextAware { public abstract class AbstractElasticsearchTemplate implements ElasticsearchOperations, ApplicationContextAware {
protected @Nullable ElasticsearchConverter elasticsearchConverter; @Nullable protected ElasticsearchConverter elasticsearchConverter;
protected @Nullable RequestFactory requestFactory; @Nullable protected RequestFactory requestFactory;
@Nullable private EntityOperations entityOperations;
private @Nullable EntityCallbacks entityCallbacks; @Nullable private EntityCallbacks entityCallbacks;
// region Initialization // region Initialization
protected void initialize(ElasticsearchConverter elasticsearchConverter) { protected void initialize(ElasticsearchConverter elasticsearchConverter) {
@ -90,6 +89,7 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
Assert.notNull(elasticsearchConverter, "elasticsearchConverter must not be null."); Assert.notNull(elasticsearchConverter, "elasticsearchConverter must not be null.");
this.elasticsearchConverter = elasticsearchConverter; this.elasticsearchConverter = elasticsearchConverter;
this.entityOperations = new EntityOperations(this.elasticsearchConverter.getMappingContext());
requestFactory = new RequestFactory(elasticsearchConverter); requestFactory = new RequestFactory(elasticsearchConverter);
VersionInfo.logVersions(getClusterVersion()); VersionInfo.logVersions(getClusterVersion());
@ -524,11 +524,11 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
@Nullable @Nullable
private String getEntityId(Object entity) { private String getEntityId(Object entity) {
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();
if (idProperty != null) { Object id = entityOperations.forEntity(entity, elasticsearchConverter.getConversionService()).getId();
return stringIdRepresentation(persistentEntity.getPropertyAccessor(entity).getProperty(idProperty));
if (id != null) {
return stringIdRepresentation(id);
} }
return null; return null;
@ -536,36 +536,16 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
@Nullable @Nullable
public String getEntityRouting(Object entity) { public String getEntityRouting(Object entity) {
ElasticsearchPersistentEntity<?> persistentEntity = elasticsearchConverter.getMappingContext() return entityOperations.forEntity(entity, elasticsearchConverter.getConversionService()).getRouting();
.getPersistentEntity(entity.getClass());
if (persistentEntity != null) {
ElasticsearchPersistentProperty joinProperty = persistentEntity.getJoinFieldProperty();
if (joinProperty != null) {
Object joinField = persistentEntity.getPropertyAccessor(entity).getProperty(joinProperty);
if (joinField != null && JoinField.class.isAssignableFrom(joinField.getClass())
&& ((JoinField<?>) joinField).getParent() != null) {
return elasticsearchConverter.convertId(((JoinField<?>) joinField).getParent());
}
}
}
return null;
} }
@Nullable @Nullable
private Long getEntityVersion(Object entity) { private Long getEntityVersion(Object entity) {
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
if (versionProperty != null) { Number version = entityOperations.forEntity(entity, elasticsearchConverter.getConversionService()).getVersion();
Object version = persistentEntity.getPropertyAccessor(entity).getProperty(versionProperty);
if (version != null && Long.class.isAssignableFrom(version.getClass())) { if (version != null && Long.class.isAssignableFrom(version.getClass())) {
return ((Long) version); return ((Long) version);
}
} }
return null; return null;
@ -573,18 +553,10 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
@Nullable @Nullable
private SeqNoPrimaryTerm getEntitySeqNoPrimaryTerm(Object entity) { private SeqNoPrimaryTerm getEntitySeqNoPrimaryTerm(Object entity) {
ElasticsearchPersistentEntity<?> persistentEntity = getRequiredPersistentEntity(entity.getClass());
ElasticsearchPersistentProperty property = persistentEntity.getSeqNoPrimaryTermProperty();
if (property != null) { EntityOperations.AdaptibleEntity<Object> adaptibleEntity = entityOperations.forEntity(entity,
Object seqNoPrimaryTerm = persistentEntity.getPropertyAccessor(entity).getProperty(property); elasticsearchConverter.getConversionService());
return adaptibleEntity.hasSeqNoPrimaryTerm() ? adaptibleEntity.getSeqNoPrimaryTerm() : null;
if (seqNoPrimaryTerm != null && SeqNoPrimaryTerm.class.isAssignableFrom(seqNoPrimaryTerm.getClass())) {
return (SeqNoPrimaryTerm) seqNoPrimaryTerm;
}
}
return null;
} }
private <T> IndexQuery getIndexQuery(T entity) { private <T> IndexQuery getIndexQuery(T entity) {

View File

@ -18,6 +18,7 @@ package org.springframework.data.elasticsearch.core;
import java.util.Map; import java.util.Map;
import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.ConversionService;
import org.springframework.data.elasticsearch.core.join.JoinField;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -296,9 +297,19 @@ class EntityOperations {
* Returns SeqNoPropertyTerm for this entity. * Returns SeqNoPropertyTerm for this entity.
* *
* @return SeqNoPrimaryTerm, may be {@literal null} * @return SeqNoPrimaryTerm, may be {@literal null}
* @since 4.0
*/ */
@Nullable @Nullable
SeqNoPrimaryTerm getSeqNoPrimaryTerm(); SeqNoPrimaryTerm getSeqNoPrimaryTerm();
/**
* returns the routing for the entity if it is available
*
* @return routing if available
* @since 4.1
*/
@Nullable
String getRouting();
} }
/** /**
@ -421,6 +432,11 @@ class EntityOperations {
public ElasticsearchPersistentEntity<?> getPersistentEntity() { public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return null; return null;
} }
@Override
public String getRouting() {
return null;
}
} }
/** /**
@ -492,56 +508,32 @@ class EntityOperations {
return new MappedEntity<>(entity, identifierAccessor, propertyAccessor); return new MappedEntity<>(entity, identifierAccessor, propertyAccessor);
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getId()
*/
@Override @Override
public Object getId() { public Object getId() {
return idAccessor.getIdentifier(); return idAccessor.getIdentifier();
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isVersionedEntity()
*/
@Override @Override
public boolean isVersionedEntity() { public boolean isVersionedEntity() {
return entity.hasVersionProperty(); return entity.hasVersionProperty();
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getVersion()
*/
@Override @Override
@Nullable @Nullable
public Object getVersion() { public Object getVersion() {
return propertyAccessor.getProperty(entity.getRequiredVersionProperty()); return propertyAccessor.getProperty(entity.getVersionProperty());
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getBean()
*/
@Override @Override
public T getBean() { public T getBean() {
return propertyAccessor.getBean(); return propertyAccessor.getBean();
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#isNew()
*/
@Override @Override
public boolean isNew() { public boolean isNew() {
return entity.isNew(propertyAccessor.getBean()); return entity.isNew(propertyAccessor.getBean());
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.Entity#getPersistentEntity()
*/
@Override @Override
public ElasticsearchPersistentEntity<?> getPersistentEntity() { public ElasticsearchPersistentEntity<?> getPersistentEntity() {
return entity; return entity;
@ -557,15 +549,17 @@ class EntityOperations {
private final ElasticsearchPersistentEntity<?> entity; private final ElasticsearchPersistentEntity<?> entity;
private final ConvertingPropertyAccessor<T> propertyAccessor; private final ConvertingPropertyAccessor<T> propertyAccessor;
private final IdentifierAccessor identifierAccessor; private final IdentifierAccessor identifierAccessor;
private final ConversionService conversionService;
private AdaptibleMappedEntity(ElasticsearchPersistentEntity<?> entity, IdentifierAccessor identifierAccessor, private AdaptibleMappedEntity(ElasticsearchPersistentEntity<?> entity, IdentifierAccessor identifierAccessor,
ConvertingPropertyAccessor<T> propertyAccessor) { ConvertingPropertyAccessor<T> propertyAccessor, ConversionService conversionService) {
super(entity, identifierAccessor, propertyAccessor); super(entity, identifierAccessor, propertyAccessor);
this.entity = entity; this.entity = entity;
this.propertyAccessor = propertyAccessor; this.propertyAccessor = propertyAccessor;
this.identifierAccessor = identifierAccessor; this.identifierAccessor = identifierAccessor;
this.conversionService = conversionService;
} }
static <T> AdaptibleEntity<T> of(T bean, static <T> AdaptibleEntity<T> of(T bean,
@ -577,22 +571,14 @@ class EntityOperations {
PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean); PersistentPropertyAccessor<T> propertyAccessor = entity.getPropertyAccessor(bean);
return new AdaptibleMappedEntity<>(entity, identifierAccessor, return new AdaptibleMappedEntity<>(entity, identifierAccessor,
new ConvertingPropertyAccessor<>(propertyAccessor, conversionService)); new ConvertingPropertyAccessor<>(propertyAccessor, conversionService), conversionService);
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#hasParent()
*/
@Override @Override
public boolean hasParent() { public boolean hasParent() {
return getRequiredPersistentEntity().getParentIdProperty() != null; return getRequiredPersistentEntity().getParentIdProperty() != null;
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#getParentId()
*/
@Deprecated @Deprecated
@Override @Override
public Object getParentId() { public Object getParentId() {
@ -601,10 +587,6 @@ class EntityOperations {
return propertyAccessor.getProperty(parentProperty); return propertyAccessor.getProperty(parentProperty);
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#populateIdIfNecessary(java.lang.Object)
*/
@Nullable @Nullable
@Override @Override
public T populateIdIfNecessary(@Nullable Object id) { public T populateIdIfNecessary(@Nullable Object id) {
@ -629,17 +611,12 @@ class EntityOperations {
return propertyAccessor.getBean(); return propertyAccessor.getBean();
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.MappedEntity#getVersion()
*/
@Override @Override
@Nullable @Nullable
public Number getVersion() { public Number getVersion() {
ElasticsearchPersistentProperty versionProperty = entity.getRequiredVersionProperty(); ElasticsearchPersistentProperty versionProperty = entity.getVersionProperty();
return versionProperty != null ? propertyAccessor.getProperty(versionProperty, Number.class) : null;
return propertyAccessor.getProperty(versionProperty, Number.class);
} }
@Override @Override
@ -655,10 +632,6 @@ class EntityOperations {
return propertyAccessor.getProperty(seqNoPrimaryTermProperty, SeqNoPrimaryTerm.class); return propertyAccessor.getProperty(seqNoPrimaryTermProperty, SeqNoPrimaryTerm.class);
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#initializeVersionProperty()
*/
@Override @Override
public T initializeVersionProperty() { public T initializeVersionProperty() {
@ -673,10 +646,6 @@ class EntityOperations {
return propertyAccessor.getBean(); return propertyAccessor.getBean();
} }
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity#incrementVersion()
*/
@Override @Override
public T incrementVersion() { public T incrementVersion() {
@ -688,6 +657,22 @@ class EntityOperations {
return propertyAccessor.getBean(); return propertyAccessor.getBean();
} }
@Override
public String getRouting() {
ElasticsearchPersistentProperty joinFieldProperty = entity.getJoinFieldProperty();
if (joinFieldProperty != null) {
JoinField<?> joinField = propertyAccessor.getProperty(joinFieldProperty, JoinField.class);
if (joinField != null && joinField.getParent() != null) {
return conversionService.convert(joinField.getParent(), String.class);
}
}
return null;
}
} }
} }

View File

@ -55,7 +55,6 @@ import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity; import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.elasticsearch.core.EntityOperations.Entity;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.document.Document;
@ -399,6 +398,8 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
} }
} }
query.setRouting(entity.getRouting());
return query; return query;
} }
@ -414,11 +415,10 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
DocumentCallback<T> callback = new ReadDocumentCallback<>(converter, entityType, index); DocumentCallback<T> callback = new ReadDocumentCallback<>(converter, entityType, index);
return doGet(id, getPersistentEntityFor(entityType), index) return doGet(id, index).flatMap(it -> callback.doWith(DocumentAdapters.from(it)));
.flatMap(it -> callback.doWith(DocumentAdapters.from(it)));
} }
private Mono<GetResult> doGet(String id, ElasticsearchPersistentEntity<?> entity, IndexCoordinates index) { private Mono<GetResult> doGet(String id, IndexCoordinates index) {
return Mono.defer(() -> doGet(requestFactory.getRequest(id, index))); return Mono.defer(() -> doGet(requestFactory.getRequest(id, index)));
} }
@ -441,9 +441,17 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
@Override @Override
public Mono<String> delete(Object entity, IndexCoordinates index) { public Mono<String> delete(Object entity, IndexCoordinates index) {
Entity<?> elasticsearchEntity = operations.forEntity(entity); AdaptibleEntity<?> elasticsearchEntity = operations.forEntity(entity, converter.getConversionService());
return Mono.defer(() -> doDeleteById(converter.convertId(elasticsearchEntity.getId()), index)); if (elasticsearchEntity.getId() == null) {
return Mono.error(new IllegalArgumentException("entity must have an id"));
}
return Mono.defer(() -> {
String id = converter.convertId(elasticsearchEntity.getId());
String routing = elasticsearchEntity.getRouting();
return doDeleteById(id, routing, index);
});
} }
@Override @Override
@ -466,13 +474,13 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(id, "id must not be null"); Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null"); Assert.notNull(index, "index must not be null");
return doDeleteById(id, index); return doDeleteById(id, null, index);
} }
private Mono<String> doDeleteById(String id, IndexCoordinates index) { private Mono<String> doDeleteById(String id, @Nullable String routing, IndexCoordinates index) {
return Mono.defer(() -> { return Mono.defer(() -> {
DeleteRequest request = requestFactory.deleteRequest(id, null, index); DeleteRequest request = requestFactory.deleteRequest(id, routing, index);
return doDelete(prepareDeleteRequest(request)); return doDelete(prepareDeleteRequest(request));
}); });
} }