Allow an index to be partitioned with custom routing (#22274)

This change makes it possible for custom routing values to go to a subset of shards rather than
just a single shard. This enables the ability to utilize the spatial locality that custom routing can
provide while mitigating the likelihood of ending up with an imbalanced cluster or suffering
from a hot shard.

This is ideal for large multi-tenant indices with custom routing that suffer from one or both of
the following:
- The big tenants cannot fit into a single shard or there is so many of them that they will likely
end up on the same shard
- Tenants often have a surge in write traffic and a single shard cannot process it fast enough

Beyond that, this should also be useful for use cases where most queries are done under the context
of a specific field (e.g. a category) since it gives a hint at how the data can be stored to minimize
the number of shards to check per query. While a similar solution can be achieved with multiple
concrete indices or aliases per value today, those approaches breakdown for high cardinality fields.

A partitioned index enforces that mappings have routing required, that the partition size does not
change when shrinking an index (the partitions will shrink proportionally), and rejects mappings
that have parent/child relationships.

Closes #21585
This commit is contained in:
Scott Somerville 2017-01-18 02:51:23 -05:00 committed by Adrien Grand
parent 3b92179e09
commit 372812da98
14 changed files with 597 additions and 18 deletions

View File

@ -131,6 +131,9 @@ public class TransportShrinkAction extends TransportMasterNodeAction<ShrinkReque
}
}
if (IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.exists(targetIndexSettings)) {
throw new IllegalArgumentException("cannot provide a routing partition size value when shrinking an index");
}
targetIndex.cause("shrink_index");
Settings.Builder settingsBuilder = Settings.builder().put(targetIndexSettings);
settingsBuilder.put("index.number_of_shards", numShards);

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.Diff;
@ -193,6 +194,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
public static final Setting<Boolean> INDEX_SHADOW_REPLICAS_SETTING =
Setting.boolSetting(SETTING_SHADOW_REPLICAS, false, Property.IndexScope);
public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size";
public static final Setting<Integer> INDEX_ROUTING_PARTITION_SIZE_SETTING =
Setting.intSetting(SETTING_ROUTING_PARTITION_SIZE, 1, 1, Property.IndexScope);
public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem";
public static final Setting<Boolean> INDEX_SHARED_FILESYSTEM_SETTING =
Setting.boolSetting(SETTING_SHARED_FILESYSTEM, false, Property.IndexScope);
@ -273,6 +278,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
public static final String INDEX_STATE_FILE_PREFIX = "state-";
private final int routingNumShards;
private final int routingFactor;
private final int routingPartitionSize;
private final int numberOfShards;
private final int numberOfReplicas;
@ -311,7 +317,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> inSyncAllocationIds,
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion,
int routingNumShards, ActiveShardCount waitForActiveShards) {
int routingNumShards, int routingPartitionSize, ActiveShardCount waitForActiveShards) {
this.index = index;
this.version = version;
@ -335,6 +341,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion;
this.routingNumShards = routingNumShards;
this.routingFactor = routingNumShards / numberOfShards;
this.routingPartitionSize = routingPartitionSize;
this.waitForActiveShards = waitForActiveShards;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}
@ -414,6 +421,14 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
return numberOfReplicas;
}
public int getRoutingPartitionSize() {
return routingPartitionSize;
}
public boolean isRoutingPartitionedIndex() {
return routingPartitionSize != 1;
}
public int getTotalNumberOfShards() {
return totalNumberOfShards;
}
@ -810,6 +825,11 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
return routingNumShards == null ? numberOfShards() : routingNumShards;
}
/**
* Returns the number of shards.
*
* @return the provided value or -1 if it has not been set.
*/
public int numberOfShards() {
return settings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1);
}
@ -819,10 +839,29 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
return this;
}
/**
* Returns the number of replicas.
*
* @return the provided value or -1 if it has not been set.
*/
public int numberOfReplicas() {
return settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1);
}
public Builder routingPartitionSize(int routingPartitionSize) {
settings = Settings.builder().put(settings).put(SETTING_ROUTING_PARTITION_SIZE, routingPartitionSize).build();
return this;
}
/**
* Returns the routing partition size.
*
* @return the provided value or -1 if it has not been set.
*/
public int routingPartitionSize() {
return settings.getAsInt(SETTING_ROUTING_PARTITION_SIZE, -1);
}
public Builder creationDate(long creationDate) {
settings = Settings.builder().put(settings).put(SETTING_CREATION_DATE, creationDate).build();
return this;
@ -886,7 +925,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
}
public Builder putInSyncAllocationIds(int shardId, Set<String> allocationIds) {
inSyncAllocationIds.put(shardId, new HashSet(allocationIds));
inSyncAllocationIds.put(shardId, new HashSet<>(allocationIds));
return this;
}
@ -965,6 +1004,12 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
throw new IllegalArgumentException("must specify non-negative number of shards for index [" + index + "]");
}
int routingPartitionSize = INDEX_ROUTING_PARTITION_SIZE_SETTING.get(settings);
if (routingPartitionSize != 1 && routingPartitionSize >= getRoutingNumShards()) {
throw new IllegalArgumentException("routing partition size [" + routingPartitionSize + "] should be a positive number"
+ " less than the number of shards [" + getRoutingNumShards() + "] for [" + index + "]");
}
// fill missing slots in inSyncAllocationIds with empty set if needed and make all entries immutable
ImmutableOpenIntMap.Builder<Set<String>> filledInSyncAllocationIds = ImmutableOpenIntMap.builder();
for (int i = 0; i < numberOfShards; i++) {
@ -1033,7 +1078,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
tmpAliases.build(), customs.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), waitForActiveShards);
indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), routingPartitionSize, waitForActiveShards);
}
public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.CollectionUtil;
@ -592,6 +593,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
.put(IndexMetaData.SETTING_VERSION_CREATED, sourceMetaData.getCreationVersion())
.put(IndexMetaData.SETTING_VERSION_UPGRADED, sourceMetaData.getUpgradedVersion())
.put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate))
.put(IndexMetaData.SETTING_ROUTING_PARTITION_SIZE, sourceMetaData.getRoutingPartitionSize())
.put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName())
.put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID());
if (sourceMetaData.getMinimumCompatibleVersion() != null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.master.MasterNodeRequest;
@ -197,12 +198,16 @@ public class MetaDataIndexTemplateService extends AbstractComponent {
Index createdIndex = null;
final String temporaryIndexName = UUIDs.randomBase64UUID();
try {
// use the provided values, otherwise just pick valid dummy values
int dummyPartitionSize = IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING.get(request.settings);
int dummyShards = request.settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS,
dummyPartitionSize == 1 ? 1 : dummyPartitionSize + 1);
//create index service for parsing and validating "mappings"
Settings dummySettings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(request.settings)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, dummyShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.build();

View File

@ -92,13 +92,10 @@ public class OperationRouting extends AbstractComponent {
final Set<String> effectiveRouting = routing.get(index);
if (effectiveRouting != null) {
for (String r : effectiveRouting) {
int shardId = generateShardId(indexMetaData, null, r);
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
final int routingPartitionSize = indexMetaData.getRoutingPartitionSize();
for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) {
set.add(shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset)));
}
// we might get duplicates, but that's ok, they will override one another
set.add(indexShard);
}
} else {
for (IndexShardRoutingTable indexShard : indexRouting) {
@ -187,6 +184,14 @@ public class OperationRouting extends AbstractComponent {
}
}
private IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) {
IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
if (indexShard == null) {
throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
}
return indexShard;
}
protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) {
IndexRoutingTable indexRouting = clusterState.routingTable().index(index);
if (indexRouting == null) {
@ -213,15 +218,33 @@ public class OperationRouting extends AbstractComponent {
return new ShardId(indexMetaData.getIndex(), generateShardId(indexMetaData, id, routing));
}
static int generateShardId(IndexMetaData indexMetaData, String id, @Nullable String routing) {
final int hash;
static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
final String effectiveRouting;
final int partitionOffset;
if (routing == null) {
hash = Murmur3HashFunction.hash(id);
assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
effectiveRouting = id;
} else {
hash = Murmur3HashFunction.hash(routing);
effectiveRouting = routing;
}
if (indexMetaData.isRoutingPartitionedIndex()) {
partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
} else {
// we would have still got 0 above but this check just saves us an unnecessary hash calculation
partitionOffset = 0;
}
return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}
private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;
// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
// of original index to hash documents
return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}
}

View File

@ -69,6 +69,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetaData.INDEX_AUTO_EXPAND_REPLICAS_SETTING,
IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING,
IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING,
IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetaData.INDEX_SHADOW_REPLICAS_SETTING,
IndexMetaData.INDEX_SHARED_FILESYSTEM_SETTING,
IndexMetaData.INDEX_READ_ONLY_SETTING,

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
@ -384,6 +385,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
MapperUtils.collect(newMapper.mapping().root(), objectMappers, fieldMappers);
checkFieldUniqueness(newMapper.type(), objectMappers, fieldMappers, fullPathObjectMappers, fieldTypes);
checkObjectsCompatibility(objectMappers, updateAllTypes, fullPathObjectMappers);
checkPartitionedIndexConstraints(newMapper);
// update lookup data-structures
// this will in particular make sure that the merged fields are compatible with other types
@ -598,6 +600,20 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
}
}
private void checkPartitionedIndexConstraints(DocumentMapper newMapper) {
if (indexSettings.getIndexMetaData().isRoutingPartitionedIndex()) {
if (newMapper.parentFieldMapper().active()) {
throw new IllegalArgumentException("mapping type name [" + newMapper.type() + "] cannot have a "
+ "_parent field for the partitioned index [" + indexSettings.getIndex().getName() + "]");
}
if (!newMapper.routingFieldMapper().required()) {
throw new IllegalArgumentException("mapping type [" + newMapper.type() + "] must have routing "
+ "required for partitioned index [" + indexSettings.getIndex().getName() + "]");
}
}
}
public DocumentMapper parse(String mappingType, CompressedXContent mappingSource, boolean applyDefault) throws MapperParsingException {
return documentParser.parse(mappingType, mappingSource, applyDefault ? defaultMappingSource : null);
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -307,4 +308,31 @@ public class CreateIndexIT extends ESIntegTestCase {
.build();
assertFalse(client().admin().indices().prepareCreate("test-idx-3").setSettings(settings).setTimeout("100ms").get().isShardsAcked());
}
public void testInvalidPartitionSize() {
BiFunction<Integer, Integer, Boolean> createPartitionedIndex = (shards, partitionSize) -> {
CreateIndexResponse response;
try {
response = prepareCreate("test_" + shards + "_" + partitionSize)
.setSettings(Settings.builder()
.put("index.number_of_shards", shards)
.put("index.routing_partition_size", partitionSize))
.execute().actionGet();
} catch (IllegalStateException | IllegalArgumentException e) {
return false;
}
return response.isAcknowledged();
};
assertFalse(createPartitionedIndex.apply(3, 6));
assertFalse(createPartitionedIndex.apply(3, 0));
assertFalse(createPartitionedIndex.apply(3, 3));
assertTrue(createPartitionedIndex.apply(3, 1));
assertTrue(createPartitionedIndex.apply(3, 2));
assertTrue(createPartitionedIndex.apply(1, 1));
}
}

View File

@ -22,7 +22,6 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -35,6 +34,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -43,8 +43,8 @@ import java.util.TreeMap;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.object.HasToString.hasToString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.object.HasToString.hasToString;
public class OperationRoutingTests extends ESTestCase{
@ -75,6 +75,142 @@ public class OperationRoutingTests extends ESTestCase{
}
}
public void testPartitionedIndex() {
// make sure the same routing value always has each _id fall within the configured partition size
for (int shards = 1; shards < 5; shards++) {
for (int partitionSize = 1; partitionSize == 1 || partitionSize < shards; partitionSize++) {
IndexMetaData metaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(shards)
.routingPartitionSize(partitionSize)
.numberOfReplicas(1)
.build();
for (int i = 0; i < 20; i++) {
String routing = randomUnicodeOfLengthBetween(1, 50);
Set<Integer> shardSet = new HashSet<>();
for (int k = 0; k < 150; k++) {
String id = randomUnicodeOfLengthBetween(1, 50);
shardSet.add(OperationRouting.generateShardId(metaData, id, routing));
}
assertEquals(partitionSize, shardSet.size());
}
}
}
}
public void testPartitionedIndexShrunk() {
Map<String, Map<String, Integer>> routingIdToShard = new HashMap<>();
Map<String, Integer> routingA = new HashMap<>();
routingA.put("a_0", 1);
routingA.put("a_1", 2);
routingA.put("a_2", 2);
routingA.put("a_3", 2);
routingA.put("a_4", 1);
routingA.put("a_5", 2);
routingIdToShard.put("a", routingA);
Map<String, Integer> routingB = new HashMap<>();
routingB.put("b_0", 0);
routingB.put("b_1", 0);
routingB.put("b_2", 0);
routingB.put("b_3", 0);
routingB.put("b_4", 3);
routingB.put("b_5", 3);
routingIdToShard.put("b", routingB);
Map<String, Integer> routingC = new HashMap<>();
routingC.put("c_0", 1);
routingC.put("c_1", 1);
routingC.put("c_2", 0);
routingC.put("c_3", 0);
routingC.put("c_4", 0);
routingC.put("c_5", 1);
routingIdToShard.put("c", routingC);
Map<String, Integer> routingD = new HashMap<>();
routingD.put("d_0", 2);
routingD.put("d_1", 2);
routingD.put("d_2", 3);
routingD.put("d_3", 3);
routingD.put("d_4", 3);
routingD.put("d_5", 3);
routingIdToShard.put("d", routingD);
IndexMetaData metaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.setRoutingNumShards(8)
.numberOfShards(4)
.routingPartitionSize(3)
.numberOfReplicas(1)
.build();
for (Map.Entry<String, Map<String, Integer>> routingIdEntry : routingIdToShard.entrySet()) {
String routing = routingIdEntry.getKey();
for (Map.Entry<String, Integer> idEntry : routingIdEntry.getValue().entrySet()) {
String id = idEntry.getKey();
int shard = idEntry.getValue();
assertEquals(shard, OperationRouting.generateShardId(metaData, id, routing));
}
}
}
public void testPartitionedIndexBWC() {
Map<String, Map<String, Integer>> routingIdToShard = new HashMap<>();
Map<String, Integer> routingA = new HashMap<>();
routingA.put("a_0", 3);
routingA.put("a_1", 2);
routingA.put("a_2", 2);
routingA.put("a_3", 3);
routingIdToShard.put("a", routingA);
Map<String, Integer> routingB = new HashMap<>();
routingB.put("b_0", 5);
routingB.put("b_1", 0);
routingB.put("b_2", 0);
routingB.put("b_3", 0);
routingIdToShard.put("b", routingB);
Map<String, Integer> routingC = new HashMap<>();
routingC.put("c_0", 4);
routingC.put("c_1", 4);
routingC.put("c_2", 3);
routingC.put("c_3", 4);
routingIdToShard.put("c", routingC);
Map<String, Integer> routingD = new HashMap<>();
routingD.put("d_0", 3);
routingD.put("d_1", 4);
routingD.put("d_2", 4);
routingD.put("d_3", 4);
routingIdToShard.put("d", routingD);
IndexMetaData metaData = IndexMetaData.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(6)
.routingPartitionSize(2)
.numberOfReplicas(1)
.build();
for (Map.Entry<String, Map<String, Integer>> routingIdEntry : routingIdToShard.entrySet()) {
String routing = routingIdEntry.getKey();
for (Map.Entry<String, Integer> idEntry : routingIdEntry.getValue().entrySet()) {
String id = idEntry.getKey();
int shard = idEntry.getValue();
assertEquals(shard, OperationRouting.generateShardId(metaData, id, routing));
}
}
}
/**
* Ensures that all changes to the hash-function / shard selection are BWC
*/

View File

@ -234,4 +234,37 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
MergeReason.MAPPING_UPDATE, random().nextBoolean()));
assertThat(e.getMessage(), containsString("[_all] is disabled in 6.0"));
}
public void testPartitionedConstraints() {
// partitioned index must have routing
IllegalArgumentException noRoutingException = expectThrows(IllegalArgumentException.class, () -> {
client().admin().indices().prepareCreate("test-index")
.addMapping("type", "{\"type\":{}}")
.setSettings(Settings.builder()
.put("index.number_of_shards", 4)
.put("index.routing_partition_size", 2))
.execute().actionGet();
});
assertTrue(noRoutingException.getMessage(), noRoutingException.getMessage().contains("must have routing"));
// partitioned index cannot have parent/child relationships
IllegalArgumentException parentException = expectThrows(IllegalArgumentException.class, () -> {
client().admin().indices().prepareCreate("test-index")
.addMapping("parent", "{\"parent\":{\"_routing\":{\"required\":true}}}")
.addMapping("child", "{\"child\": {\"_routing\":{\"required\":true}, \"_parent\": {\"type\": \"parent\"}}}")
.setSettings(Settings.builder()
.put("index.number_of_shards", 4)
.put("index.routing_partition_size", 2))
.execute().actionGet();
});
assertTrue(parentException.getMessage(), parentException.getMessage().contains("cannot have a _parent field"));
// valid partitioned index
assertTrue(client().admin().indices().prepareCreate("test-index")
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}")
.setSettings(Settings.builder()
.put("index.number_of_shards", 4)
.put("index.routing_partition_size", 2))
.execute().actionGet().isAcknowledged());
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import java.io.IOException;
@ -306,7 +305,6 @@ public class SimpleIndexTemplateIT extends ESIntegTestCase {
"get template with " + Arrays.toString(names));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/8802")
public void testBrokenMapping() throws Exception {
// clean all templates setup by the framework.
client().admin().indices().prepareDeleteTemplate("*").get();
@ -764,4 +762,63 @@ public class SimpleIndexTemplateIT extends ESIntegTestCase {
assertEquals("value1", searchResponse.getHits().getAt(0).field("field1").value().toString());
assertNull(searchResponse.getHits().getAt(0).field("field2"));
}
public void testPartitionedTemplate() throws Exception {
// clean all templates setup by the framework.
client().admin().indices().prepareDeleteTemplate("*").get();
// check get all templates on an empty index.
GetIndexTemplatesResponse response = client().admin().indices().prepareGetTemplates().get();
assertThat(response.getIndexTemplates(), empty());
// provide more partitions than shards
IllegalArgumentException eBadSettings = expectThrows(IllegalArgumentException.class,
() -> client().admin().indices().preparePutTemplate("template_1")
.setPatterns(Collections.singletonList("te*"))
.setSettings(Settings.builder()
.put("index.number_of_shards", "5")
.put("index.routing_partition_size", "6"))
.get());
assertThat(eBadSettings.getMessage(), containsString("partition size [6] should be a positive number "
+ "less than the number of shards [5]"));
// provide an invalid mapping for a partitioned index
IllegalArgumentException eBadMapping = expectThrows(IllegalArgumentException.class,
() -> client().admin().indices().preparePutTemplate("template_2")
.setPatterns(Collections.singletonList("te*"))
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":false}}}")
.setSettings(Settings.builder()
.put("index.number_of_shards", "6")
.put("index.routing_partition_size", "3"))
.get());
assertThat(eBadMapping.getMessage(), containsString("must have routing required for partitioned index"));
// no templates yet
response = client().admin().indices().prepareGetTemplates().get();
assertEquals(0, response.getIndexTemplates().size());
// a valid configuration that only provides the partition size
assertAcked(client().admin().indices().preparePutTemplate("just_partitions")
.setPatterns(Collections.singletonList("te*"))
.setSettings(Settings.builder()
.put("index.routing_partition_size", "6"))
.get());
// create an index with too few shards
IllegalArgumentException eBadIndex = expectThrows(IllegalArgumentException.class,
() -> prepareCreate("test_bad", Settings.builder()
.put("index.number_of_shards", 5))
.get());
assertThat(eBadIndex.getMessage(), containsString("partition size [6] should be a positive number "
+ "less than the number of shards [5]"));
// finally, create a valid index
prepareCreate("test_good", Settings.builder()
.put("index.number_of_shards", 7))
.get();
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test_good").get();
assertEquals("6", getSettingsResponse.getIndexToSettings().get("test_good").getAsMap().get("index.routing_partition_size"));
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.routing;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase;
import org.mockito.internal.util.collections.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class PartitionedRoutingIT extends ESIntegTestCase {
public void testVariousPartitionSizes() throws Exception {
for (int shards = 1; shards <= 4; shards++) {
for (int partitionSize = 1; partitionSize < shards; partitionSize++) {
String index = "index_" + shards + "_" + partitionSize;
client().admin().indices().prepareCreate(index)
.setSettings(Settings.builder()
.put("index.number_of_shards", shards)
.put("index.routing_partition_size", partitionSize))
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}")
.execute().actionGet();
ensureGreen();
Map<String, Set<String>> routingToDocumentIds = generateRoutedDocumentIds(index);
verifyGets(index, routingToDocumentIds);
verifyBroadSearches(index, routingToDocumentIds, shards);
verifyRoutedSearches(index, routingToDocumentIds, Sets.newSet(partitionSize));
}
}
}
public void testShrinking() throws Exception {
// creates random routing groups and repeatedly halves the index until it is down to 1 shard
// verifying that the count is correct for each shrunken index
final int partitionSize = 3;
final int originalShards = 8;
int currentShards = originalShards;
String index = "index_" + currentShards;
client().admin().indices().prepareCreate(index)
.setSettings(Settings.builder()
.put("index.number_of_shards", currentShards)
.put("index.routing_partition_size", partitionSize))
.addMapping("type", "{\"type\":{\"_routing\":{\"required\":true}}}")
.execute().actionGet();
ensureGreen();
Map<String, Set<String>> routingToDocumentIds = generateRoutedDocumentIds(index);
while (true) {
int factor = originalShards / currentShards;
verifyGets(index, routingToDocumentIds);
verifyBroadSearches(index, routingToDocumentIds, currentShards);
// we need the floor and ceiling of the routing_partition_size / factor since the partition size of the shrunken
// index will be one of those, depending on the routing value
verifyRoutedSearches(index, routingToDocumentIds,
Math.floorDiv(partitionSize, factor) == 0 ?
Sets.newSet(1, 2) :
Sets.newSet(Math.floorDiv(partitionSize, factor), -Math.floorDiv(-partitionSize, factor)));
client().admin().indices().prepareUpdateSettings(index)
.setSettings(Settings.builder()
.put("index.routing.allocation.require._name", client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes().values().toArray(DiscoveryNode.class)[0].getName())
.put("index.blocks.write", true)).get();
ensureGreen();
currentShards = Math.floorDiv(currentShards, 2);
if (currentShards == 0) {
break;
}
String previousIndex = index;
index = "index_" + currentShards;
logger.info("--> shrinking index [" + previousIndex + "] to [" + index + "]");
client().admin().indices().prepareShrinkIndex(previousIndex, index)
.setSettings(Settings.builder()
.put("index.number_of_shards", currentShards)
.build()).get();
ensureGreen();
}
}
private void verifyRoutedSearches(String index, Map<String, Set<String>> routingToDocumentIds, Set<Integer> expectedShards) {
for (Map.Entry<String, Set<String>> routingEntry : routingToDocumentIds.entrySet()) {
String routing = routingEntry.getKey();
int expectedDocuments = routingEntry.getValue().size();
SearchResponse response = client().prepareSearch()
.setQuery(QueryBuilders.termQuery("_routing", routing))
.setRouting(routing)
.setIndices(index)
.setSize(100)
.execute().actionGet();
logger.info("--> routed search on index [" + index + "] visited [" + response.getTotalShards()
+ "] shards for routing [" + routing + "] and got hits [" + response.getHits().totalHits() + "]");
assertTrue(response.getTotalShards() + " was not in " + expectedShards + " for " + index,
expectedShards.contains(response.getTotalShards()));
assertEquals(expectedDocuments, response.getHits().totalHits());
Set<String> found = new HashSet<>();
response.getHits().forEach(h -> found.add(h.getId()));
assertEquals(routingEntry.getValue(), found);
}
}
private void verifyBroadSearches(String index, Map<String, Set<String>> routingToDocumentIds, int expectedShards) {
for (Map.Entry<String, Set<String>> routingEntry : routingToDocumentIds.entrySet()) {
String routing = routingEntry.getKey();
int expectedDocuments = routingEntry.getValue().size();
SearchResponse response = client().prepareSearch()
.setQuery(QueryBuilders.termQuery("_routing", routing))
.setIndices(index)
.setSize(100)
.execute().actionGet();
assertEquals(expectedShards, response.getTotalShards());
assertEquals(expectedDocuments, response.getHits().totalHits());
Set<String> found = new HashSet<>();
response.getHits().forEach(h -> found.add(h.getId()));
assertEquals(routingEntry.getValue(), found);
}
}
private void verifyGets(String index, Map<String, Set<String>> routingToDocumentIds) {
for (Map.Entry<String, Set<String>> routingEntry : routingToDocumentIds.entrySet()) {
String routing = routingEntry.getKey();
for (String id : routingEntry.getValue()) {
assertTrue(client().prepareGet(index, "type", id).setRouting(routing).execute().actionGet().isExists());
}
}
}
private Map<String, Set<String>> generateRoutedDocumentIds(String index) {
Map<String, Set<String>> routingToDocumentIds = new HashMap<>();
int numRoutingValues = randomIntBetween(5, 15);
for (int i = 0; i < numRoutingValues; i++) {
String routingValue = String.valueOf(i);
int numDocuments = randomIntBetween(10, 100);
routingToDocumentIds.put(String.valueOf(routingValue), new HashSet<>());
for (int k = 0; k < numDocuments; k++) {
String id = routingValue + "_" + String.valueOf(k);
routingToDocumentIds.get(routingValue).add(id);
client().prepareIndex(index, "type", id)
.setRouting(routingValue)
.setSource("foo", "bar")
.get();
}
}
client().admin().indices().prepareRefresh(index).get();
return routingToDocumentIds;
}
}

View File

@ -79,6 +79,13 @@ Checking shards may take a lot of time on large indices.
which uses https://en.wikipedia.org/wiki/DEFLATE[DEFLATE] for a higher
compression ratio, at the expense of slower stored fields performance.
[[routing-partition-size]] `index.routing_partition_size`::
The number of shards a custom <<mapping-routing-field,routing>> value can go to.
Defaults to 1 and can only be set at index creation time. This value must be less
than the `index.number_of_shards` unless the `index.number_of_shards` value is also 1.
See <<routing-index-partition>> for more details about how this setting is used.
[float]
[[dynamic-index-settings]]
=== Dynamic index settings

View File

@ -109,3 +109,29 @@ documents with the same `_id` might end up on different shards if indexed with
different `_routing` values.
It is up to the user to ensure that IDs are unique across the index.
[[routing-index-partition]]
==== Routing to an index partition
An index can be configured such that custom routing values will go to a subset of the shards rather
than a single shard. This helps mitigate the risk of ending up with an imbalanced cluster while still
reducing the impact of searches.
This is done by providing the index level setting <<routing-partition-size,`index.routing_partition_size`>> at index creation.
As the partition size increases, the more evenly distributed the data will become at the
expense of having to search more shards per request.
When this setting is present, the formula for calculating the shard becomes:
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
That is, the `_routing` field is used to calculate a set of shards within the index and then the
`_id` is used to pick a shard within that set.
To enable this feature, the `index.routing_partition_size` should have a value greater than 1 and
less than `index.number_of_shards`.
Once enabled, the partitioned index will have the following limitations:
* Mappings with parent-child relationships cannot be created within it.
* All mappings within the index must have the `_routing` field marked as required.