Support different routing for each id in multiget.

Original Pull Request #1956 
Closes #1954
This commit is contained in:
Peter-Josef Meisch 2021-10-09 19:43:16 +02:00 committed by GitHub
parent 5e4ce56414
commit 2450d579e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 211 additions and 40 deletions

View File

@ -15,6 +15,7 @@
*/
package org.springframework.data.elasticsearch.core;
import java.util.Collection;
import java.util.List;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@ -121,6 +122,8 @@ public interface DocumentOperations {
* @param query the query defining the ids of the objects to get
* @param clazz the type of the object to be returned
* @return list of {@link MultiGetItem}s
* @see Query#multiGetQuery(Collection)
* @see Query#multiGetQueryWithRouting(List)
* @since 4.1
*/
<T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz);
@ -132,6 +135,8 @@ public interface DocumentOperations {
* @param clazz the type of the object to be returned
* @param index the index(es) from which the objects are read.
* @return list of {@link MultiGetItem}s
* @see Query#multiGetQuery(Collection)
* @see Query#multiGetQueryWithRouting(List)
*/
<T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index);

View File

@ -182,7 +182,6 @@ public class ElasticsearchRestTemplate extends AbstractElasticsearchRestTranspor
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
Assert.notNull(index, "index must not be null");
Assert.notEmpty(query.getIds(), "No Id defined for Query");
MultiGetRequest request = requestFactory.multiGetRequest(query, clazz, index);
MultiGetResponse result = execute(client -> client.mget(request, RequestOptions.DEFAULT));

View File

@ -205,7 +205,6 @@ public class ElasticsearchTemplate extends AbstractElasticsearchRestTransportTem
public <T> List<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index) {
Assert.notNull(index, "index must not be null");
Assert.notEmpty(query.getIds(), "No Ids defined for Query");
MultiGetRequestBuilder builder = requestFactory.multiGetRequestBuilder(client, query, clazz, index);

View File

@ -148,6 +148,8 @@ public interface ReactiveDocumentOperations {
* @param query the query defining the ids of the objects to get
* @param clazz the type of the object to be returned, used to determine the index
* @return flux with list of {@link MultiGetItem}s that contain the entities
* @see Query#multiGetQuery(Collection)
* @see Query#multiGetQueryWithRouting(List)
* @since 4.1
*/
<T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz);
@ -159,6 +161,8 @@ public interface ReactiveDocumentOperations {
* @param clazz the type of the object to be returned
* @param index the index(es) from which the objects are read.
* @return flux with list of {@link MultiGetItem}s that contain the entities
* @see Query#multiGetQuery(Collection)
* @see Query#multiGetQueryWithRouting(List)
* @since 4.0
*/
<T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz, IndexCoordinates index);

View File

@ -338,7 +338,6 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
Assert.notNull(index, "Index must not be null");
Assert.notNull(clazz, "Class must not be null");
Assert.notNull(query, "Query must not be null");
Assert.notEmpty(query.getIds(), "No Id define for Query");
DocumentCallback<T> callback = new ReadDocumentCallback<>(converter, clazz, index);

View File

@ -677,13 +677,13 @@ class RequestFactory {
FetchSourceContext fetchSourceContext = getFetchSourceContext(searchQuery);
if (!isEmpty(searchQuery.getIds())) {
if (!isEmpty(searchQuery.getIdsWithRouting())) {
String indexName = index.getIndexName();
for (String id : searchQuery.getIds()) {
MultiGetRequest.Item item = new MultiGetRequest.Item(indexName, id);
if (searchQuery.getRoute() != null) {
item = item.routing(searchQuery.getRoute());
for (Query.IdWithRouting idWithRouting : searchQuery.getIdsWithRouting()) {
MultiGetRequest.Item item = new MultiGetRequest.Item(indexName, idWithRouting.getId());
if (idWithRouting.getRouting() != null) {
item = item.routing(idWithRouting.getRouting());
}
// note: multiGet does not have fields, need to set sourceContext to filter

View File

@ -16,12 +16,15 @@
package org.springframework.data.elasticsearch.core.query;
import static java.util.Collections.*;
import static org.springframework.util.CollectionUtils.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
@ -29,7 +32,7 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* AbstractQuery
* BaseQuery
*
* @author Rizwan Idrees
* @author Mohsin Husen
@ -40,7 +43,7 @@ import org.springframework.util.Assert;
* @author Peter-Josef Meisch
* @author Peer Mueller
*/
abstract class AbstractQuery implements Query {
public class BaseQuery implements Query {
protected Pageable pageable = DEFAULT_PAGE;
@Nullable protected Sort sort;
@ -63,6 +66,7 @@ abstract class AbstractQuery implements Query {
@Nullable private List<Object> searchAfter;
protected List<RescorerQuery> rescorerQueries = new ArrayList<>();
@Nullable protected Boolean requestCache;
private List<IdWithRouting> idsWithRouting = Collections.emptyList();
@Override
@Nullable
@ -81,7 +85,7 @@ abstract class AbstractQuery implements Query {
Assert.notNull(pageable, "Pageable must not be null!");
this.pageable = pageable;
return (T) this.addSort(pageable.getSort());
return this.addSort(pageable.getSort());
}
@Override
@ -116,7 +120,7 @@ abstract class AbstractQuery implements Query {
@Override
@SuppressWarnings("unchecked")
public final <T extends Query> T addSort(Sort sort) {
public final <T extends Query> T addSort(@Nullable Sort sort) {
if (sort == null) {
return (T) this;
}
@ -139,14 +143,46 @@ abstract class AbstractQuery implements Query {
this.minScore = minScore;
}
@Nullable
/**
* Set Ids for a multi-get request with on this query.
*
* @param ids list of id values
*/
public void setIds(@Nullable Collection<String> ids) {
this.ids = ids;
}
@Override
@Nullable
public Collection<String> getIds() {
return ids;
}
public void setIds(Collection<String> ids) {
this.ids = ids;
@Override
public List<IdWithRouting> getIdsWithRouting() {
if (!isEmpty(idsWithRouting)) {
return Collections.unmodifiableList(idsWithRouting);
}
if (!isEmpty(ids)) {
return ids.stream().map(id -> new IdWithRouting(id, route)).collect(Collectors.toList());
}
return Collections.emptyList();
}
/**
* Set Ids with routing values for a multi-get request set on this query.
*
* @param idsWithRouting list of id values, must not be {@literal null}
* @since 4.3
*/
public void setIdsWithRouting(List<IdWithRouting> idsWithRouting) {
Assert.notNull(idsWithRouting, "idsWithRouting must not be null");
this.idsWithRouting = idsWithRouting;
}
@Nullable
@ -337,4 +373,5 @@ abstract class AbstractQuery implements Query {
public Boolean getRequestCache() {
return this.requestCache;
}
}

View File

@ -26,7 +26,7 @@ import org.springframework.util.Assert;
* @author Mark Paluch
* @author Peter-Josef Meisch
*/
public class CriteriaQuery extends AbstractQuery {
public class CriteriaQuery extends BaseQuery {
private Criteria criteria;

View File

@ -16,7 +16,7 @@
package org.springframework.data.elasticsearch.core.query;
import static java.util.Collections.*;
import static org.springframework.data.elasticsearch.core.query.AbstractQuery.*;
import static org.springframework.data.elasticsearch.core.query.BaseQuery.*;
import java.util.ArrayList;
import java.util.List;

View File

@ -42,7 +42,7 @@ import org.springframework.lang.Nullable;
* @author Martin Choraine
* @author Peter-Josef Meisch
*/
public class NativeSearchQuery extends AbstractQuery {
public class NativeSearchQuery extends BaseQuery {
@Nullable private final QueryBuilder query;
@Nullable private QueryBuilder filter;

View File

@ -26,6 +26,7 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Query
@ -76,7 +77,7 @@ public interface Query {
* @param sort
* @return
*/
<T extends Query> T addSort(Sort sort);
<T extends Query> T addSort(@Nullable Sort sort);
/**
* @return null if not set
@ -137,13 +138,48 @@ public interface Query {
boolean getTrackScores();
/**
* Get Ids
*
* @return
* @return Get ids set on this query.
*/
@Nullable
Collection<String> getIds();
/**
* @return Ids with routing values used in a multi-get request.
* @see #multiGetQueryWithRouting(List)
* @since 4.3
*/
List<IdWithRouting> getIdsWithRouting();
/**
* Utility method to get a query for a multiget request
*
* @param idsWithRouting Ids with routing values used in a multi-get request.
* @return Query instance
*/
static Query multiGetQueryWithRouting(List<IdWithRouting> idsWithRouting) {
Assert.notNull(idsWithRouting, "idsWithRouting must not be null");
BaseQuery query = new BaseQuery();
query.setIdsWithRouting(idsWithRouting);
return query;
}
/**
* Utility method to get a query for a multiget request
*
* @param ids Ids used in a multi-get request.
* @return Query instance
*/
static Query multiGetQuery(Collection<String> ids) {
Assert.notNull(ids, "ids must not be null");
BaseQuery query = new BaseQuery();
query.setIds(ids);
return query;
}
/**
* Get route
*
@ -362,4 +398,31 @@ public interface Query {
enum SearchType {
QUERY_THEN_FETCH, DFS_QUERY_THEN_FETCH
}
/**
* Value class combining an id with a routing value. Used in multi-get requests.
*
* @since 4.3
*/
final class IdWithRouting {
private final String id;
@Nullable private final String routing;
public IdWithRouting(String id, @Nullable String routing) {
Assert.notNull(id, "id must not be null");
this.id = id;
this.routing = routing;
}
public String getId() {
return id;
}
@Nullable
public String getRouting() {
return routing;
}
}
}

View File

@ -24,7 +24,7 @@ import org.springframework.data.domain.Sort;
* @author Rizwan Idrees
* @author Mohsin Husen
*/
public class StringQuery extends AbstractQuery {
public class StringQuery extends BaseQuery {
private String source;

View File

@ -16,6 +16,7 @@
package org.springframework.data.elasticsearch.repository.support;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.springframework.util.CollectionUtils.*;
import java.util.ArrayList;
import java.util.Collections;
@ -50,7 +51,6 @@ import org.springframework.data.util.StreamUtils;
import org.springframework.data.util.Streamable;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* Elasticsearch specific repository implementation. Likely to be used as target within
@ -149,7 +149,7 @@ public class SimpleElasticsearchRepository<T, ID> implements ElasticsearchReposi
List<T> result = new ArrayList<>();
Query idQuery = getIdQuery(ids);
if (CollectionUtils.isEmpty(idQuery.getIds())) {
if (isEmpty(idQuery.getIds())) {
return result;
}

View File

@ -30,7 +30,6 @@ import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilterBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.SourceFilter;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
@ -72,10 +71,8 @@ public class SourceFilterIntegrationTests {
void shouldOnlyReturnRequestedFieldsOnGMultiGet() {
// multiget has no fields, need sourcefilter here
Query query = new NativeSearchQueryBuilder() //
.withIds(Collections.singleton("42")) //
.withSourceFilter(new FetchSourceFilterBuilder().withIncludes("field2").build()) //
.build(); //
Query query = Query.multiGetQuery(Collections.singleton("42"));
query.addSourceFilter(new FetchSourceFilterBuilder().withIncludes("field2").build()); //
List<MultiGetItem<Entity>> entities = operations.multiGet(query, Entity.class);
@ -116,7 +113,7 @@ public class SourceFilterIntegrationTests {
@DisplayName("should not return excluded fields from SourceFilter on multiget")
void shouldNotReturnExcludedFieldsFromSourceFilterOnMultiGet() {
Query query = new NativeSearchQueryBuilder().withIds(Collections.singleton("42")).build();
Query query = Query.multiGetQuery(Collections.singleton("42"));
query.addSourceFilter(new SourceFilter() {
@Override
public String[] getIncludes() {
@ -168,7 +165,7 @@ public class SourceFilterIntegrationTests {
@DisplayName("should only return included fields from SourceFilter on multiget")
void shouldOnlyReturnIncludedFieldsFromSourceFilterOnMultiGet() {
Query query = new NativeSearchQueryBuilder().withIds(Collections.singleton("42")).build();
Query query = Query.multiGetQuery(Collections.singleton("42"));
query.addSourceFilter(new SourceFilter() {
@Override
public String[] getIncludes() {

View File

@ -17,9 +17,13 @@ package org.springframework.data.elasticsearch.core.routing;
import static org.assertj.core.api.Assertions.*;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.apache.lucene.util.StringHelper;
import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
@ -31,8 +35,10 @@ import org.springframework.data.elasticsearch.annotations.Routing;
import org.springframework.data.elasticsearch.annotations.Setting;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.MultiGetItem;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BaseQuery;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
@ -48,6 +54,7 @@ import org.springframework.test.context.ContextConfiguration;
public class ElasticsearchOperationsRoutingTests {
private static final String INDEX = "routing-test";
private static final String ID_0 = "id0";
private static final String ID_1 = "id1";
private static final String ID_2 = "id2";
private static final String ID_3 = "id3";
@ -57,16 +64,22 @@ public class ElasticsearchOperationsRoutingTests {
@BeforeAll
static void beforeAll() {
// check that the used id values go to different shards of the index which is configured to have 5 shards.
// check that the used id values go to different shards of the index which is configured to have 7 shards.
// Elasticsearch uses the following function:
Function<String, Integer> calcShard = routing -> Math.floorMod(Murmur3HashFunction.hash(routing), 5);
Function<String, Integer> calcShard = routing -> Math.floorMod(Murmur3HashFunction.hash(routing), 7);
Integer shard0 = calcShard.apply(ID_0);
Integer shard1 = calcShard.apply(ID_1);
Integer shard2 = calcShard.apply(ID_2);
Integer shard3 = calcShard.apply(ID_3);
assertThat(shard0).isNotEqualTo(shard1);
assertThat(shard0).isNotEqualTo(shard2);
assertThat(shard0).isNotEqualTo(shard3);
assertThat(shard1).isNotEqualTo(shard2);
assertThat(shard1).isNotEqualTo(shard3);
assertThat(shard2).isNotEqualTo(shard3);
}
@ -84,7 +97,6 @@ public class ElasticsearchOperationsRoutingTests {
RoutingEntity entity = new RoutingEntity(ID_1, ID_2);
operations.save(entity);
indexOps.refresh();
RoutingEntity savedEntity = operations.withRouting(RoutingResolver.just(ID_2)).get(entity.id, RoutingEntity.class);
@ -97,7 +109,6 @@ public class ElasticsearchOperationsRoutingTests {
RoutingEntity entity = new RoutingEntity(ID_1, ID_2);
operations.save(entity);
indexOps.refresh();
String deletedId = operations.withRouting(RoutingResolver.just(ID_2)).delete(entity.id, IndexCoordinates.of(INDEX));
@ -106,11 +117,11 @@ public class ElasticsearchOperationsRoutingTests {
@Test // #1218
@DisplayName("should store data with different routing and get the routing in the search result")
void shouldStoreDataWithDifferentRoutingAndGetTheRoutingInTheSearchResult() {
RoutingEntity entity = new RoutingEntity(ID_1, ID_2);
operations.save(entity);
indexOps.refresh();
SearchHits<RoutingEntity> searchHits = operations.search(Query.findAll(), RoutingEntity.class);
@ -118,8 +129,38 @@ public class ElasticsearchOperationsRoutingTests {
assertThat(searchHits.getSearchHit(0).getRouting()).isEqualTo(ID_2);
}
@Test // #1954
@DisplayName("should use routing values in multiget")
void shouldUseRoutingValuesInMultiget() {
Consumer<String> save = (String id) -> operations.save(new RoutingEntity(id, id));
save.accept(ID_1);
save.accept(ID_2);
save.accept(ID_3);
Query query = Query.multiGetQueryWithRouting( //
Arrays.asList( //
new Query.IdWithRouting(ID_1, ID_1), //
new Query.IdWithRouting(ID_2, ID_2), //
new Query.IdWithRouting(ID_3, ID_3) //
) //
); //
// make sure that the correct routing values are used
((BaseQuery) query).setRoute(ID_0);
List<MultiGetItem<RoutingEntity>> multiGetItems = operations.multiGet(query, RoutingEntity.class);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(multiGetItems).hasSize(3);
softly.assertThat(multiGetItems.get(0).hasItem()).isTrue();
softly.assertThat(multiGetItems.get(1).hasItem()).isTrue();
softly.assertThat(multiGetItems.get(2).hasItem()).isTrue();
softly.assertAll();
}
@Document(indexName = INDEX)
@Setting(shards = 5)
@Setting(shards = 7)
@Routing("routing")
static class RoutingEntity {
@Nullable @Id private String id;
@ -169,4 +210,31 @@ public class ElasticsearchOperationsRoutingTests {
return result;
}
}
/**
* Copied from org.elasticsearch.cluster.routing.Murmur3HashFunction from Elasticsearch 7.9.3
*/
static class Murmur3HashFunction {
private Murmur3HashFunction() {
// no instance
}
public static int hash(String routing) {
final byte[] bytesToHash = new byte[routing.length() * 2];
for (int i = 0; i < routing.length(); ++i) {
final char c = routing.charAt(i);
final byte b1 = (byte) c, b2 = (byte) (c >>> 8);
assert ((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c; // no information loss
bytesToHash[i * 2] = b1;
bytesToHash[i * 2 + 1] = b2;
}
return hash(bytesToHash, 0, bytesToHash.length);
}
public static int hash(byte[] bytes, int offset, int length) {
return StringHelper.murmurhash3_x86_32(bytes, offset, length, 0);
}
}
}