Mappings: Wait for mappings to be available on the primary before indexing.

In some cases it might happen that a mapping which is already available on the
master node is not available yet on the node that holds the primary shard.
This commit changes indexing on the primary shard so that if a dynamic update
is triggered then the index operation is re-tried until required mappings are
available locally (using cluster state observing).
This commit is contained in:
Adrien Grand 2015-05-04 11:52:03 +02:00
parent 72d99773dc
commit 8a19bf3aed
6 changed files with 247 additions and 106 deletions

View File

@ -75,7 +75,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
private final MappingUpdatedAction mappingUpdatedAction;
private final UpdateHelper updateHelper;
private final boolean allowIdGeneration;
@ -83,9 +82,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction,
actionFilters,
BulkShardRequest.class, BulkShardRequest.class, ThreadPool.Names.BULK);
this.mappingUpdatedAction = mappingUpdatedAction;
this.updateHelper = updateHelper;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
}
@ -323,11 +322,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
static class WriteResult {
final ActionWriteResponse response;
final Engine.IndexingOperation op;
WriteResult(ActionWriteResponse response, Engine.IndexingOperation op) {
WriteResult(ActionWriteResponse response) {
this.response = response;
this.op = op;
}
@SuppressWarnings("unchecked")
@ -355,49 +352,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
indexRequest.process(clusterState.metaData(), mappingMd, allowIdGeneration, request.index());
}
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
final Engine.IndexingOperation operation;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
} else {
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
operation = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
// update the version on request so it will happen on the replicas
final long version = operation.version();
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.version(version);
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
IndexResponse indexResponse = new IndexResponse(request.index(), indexRequest.type(), indexRequest.id(), version, created);
return new WriteResult(indexResponse, operation);
final IndexResponse response = executeIndexRequestOnPrimary(request, indexRequest, indexShard);
return new WriteResult(response);
}
private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
@ -410,7 +366,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
DeleteResponse deleteResponse = new DeleteResponse(request.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.found());
return new WriteResult(deleteResponse, null);
return new WriteResult(deleteResponse);
}
static class UpdateResult {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -56,8 +57,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Inject
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters,
MappingUpdatedAction mappingUpdatedAction) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
mappingUpdatedAction, actionFilters,
DeleteRequest.class, DeleteRequest.class, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.autoCreateIndex = new AutoCreateIndex(settings);

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
@ -38,19 +37,16 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -69,7 +65,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;
private final TransportCreateIndexAction createIndexAction;
private final MappingUpdatedAction mappingUpdatedAction;
private final ClusterService clusterService;
@ -77,10 +72,9 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
IndexRequest.class, IndexRequest.class, ThreadPool.Names.INDEX);
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction,
actionFilters, IndexRequest.class, IndexRequest.class, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = new AutoCreateIndex(settings);
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.clusterService = clusterService;
@ -171,40 +165,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.IndexingOperation operation;
if (request.opType() == IndexRequest.OpType.INDEX) {
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
operation = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
}
final boolean created;
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
final IndexResponse response = executeIndexRequestOnPrimary(null, request, indexShard);
if (request.refresh()) {
try {
@ -214,13 +176,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
}
// update the version on the request, so it will be used for the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
return new Tuple<>(new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created), shardRequest.request);
return new Tuple<>(response, shardRequest.request);
}
@Override

View File

@ -25,12 +25,17 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -39,6 +44,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
@ -48,13 +54,18 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -76,6 +87,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected final ShardStateAction shardStateAction;
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
protected final TransportRequestOptions transportOptions;
protected final MappingUpdatedAction mappingUpdatedAction;
final String transportReplicaAction;
final String executor;
@ -83,13 +95,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters,
ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
Class<Request> request, Class<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, threadPool, actionFilters);
this.transportService = transportService;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.mappingUpdatedAction = mappingUpdatedAction;
this.transportReplicaAction = actionName + "[r]";
this.executor = executor;
@ -145,7 +159,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
protected boolean retryPrimaryException(Throwable e) {
return TransportActions.isShardNotAvailableException(e);
return e.getClass() == RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e);
}
/**
@ -293,6 +308,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
protected static class RetryOnPrimaryException extends IndexShardException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
super(shardId, msg);
}
public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
}
/**
* Responsible for performing all operations up to the point we start starting sending requests to replica shards.
* Including forwarding the request to another node if the primary is not assigned locally.
@ -1001,4 +1027,66 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
}
/** Utility method to create either an index or a create operation depending
* on the {@link OpType} of the request. */
private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
boolean canHaveDuplicates = request.canHaveDuplicates();
if (shardRequest != null) {
canHaveDuplicates |= shardRequest.canHaveDuplicates();
}
if (request.opType() == IndexRequest.OpType.INDEX) {
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, canHaveDuplicates);
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
return indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, canHaveDuplicates, canHaveDuplicates);
}
}
/** Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. */
protected final IndexResponse executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexShard.indexService().mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
return new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.support.replication;
import com.google.common.base.Predicate;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -41,7 +42,13 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
@ -77,8 +84,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.hamcrest.Matchers.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ShardReplicationOperationTests extends ElasticsearchTestCase {
@ -687,7 +701,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
ClusterService clusterService,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, null, threadPool,
new ShardStateAction(settings, clusterService, transportService, null, null),
new ShardStateAction(settings, clusterService, transportService, null, null), null,
new ActionFilters(new HashSet<ActionFilter>()), Request.class, Request.class, ThreadPool.Names.SAME);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -35,7 +36,9 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
@ -49,10 +52,13 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -130,7 +136,125 @@ public class RareClusterStateTests extends ElasticsearchIntegrationTest {
assertHitCount(client().prepareSearch("test").get(), 0);
}
public void testDelayedMappingPropagationOnPrimary() throws Exception {
// Here we want to test that things go well if there is a first request
// that adds mappings but before mappings are propagated to all nodes
// another index request introduces the same mapping. The master node
// will reply immediately since it did not change the cluster state
// but the change might not be on the node that performed the indexing
// operation yet
Settings settings = ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "0ms").build();
final List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
final String master = internalCluster().getMasterName();
assertThat(nodeNames, hasItem(master));
String otherNode = null;
for (String node : nodeNames) {
if (node.equals(master) == false) {
otherNode = node;
break;
}
}
assertNotNull(otherNode);
// Don't allocate the shard on the master node
assertAcked(prepareCreate("index").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.exclude._name", master)).get());
ensureGreen();
// Check routing tables
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertEquals(master, state.nodes().masterNode().name());
List<ShardRouting> shards = state.routingTable().allShards("index");
assertThat(shards, hasSize(1));
for (ShardRouting shard : shards) {
if (shard.primary()) {
// primary must not be on the master node
assertFalse(state.nodes().masterNodeId().equals(shard.currentNodeId()));
} else {
fail(); // only primaries
}
}
// Block cluster state processing where our shard is
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(otherNode, getRandom());
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
// Add a new mapping...
final AtomicReference<Object> putMappingResponse = new AtomicReference<>();
client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {
putMappingResponse.set(response);
}
@Override
public void onFailure(Throwable e) {
putMappingResponse.set(e);
}
});
// ...and wait for mappings to be available on master
assertBusy(new Runnable() {
@Override
public void run() {
ImmutableOpenMap<String, MappingMetaData> indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index");
assertNotNull(indexMappings);
MappingMetaData typeMappings = indexMappings.get("type");
assertNotNull(typeMappings);
Object properties;
try {
properties = typeMappings.getSourceAsMap().get("properties");
} catch (IOException e) {
throw new AssertionError(e);
}
assertNotNull(properties);
Object fieldMapping = ((Map<String, Object>) properties).get("field");
assertNotNull(fieldMapping);
}
});
final AtomicReference<Object> docIndexResponse = new AtomicReference<>();
client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
docIndexResponse.set(response);
}
@Override
public void onFailure(Throwable e) {
docIndexResponse.set(e);
}
});
// Wait a bit to make sure that the reason why we did not get a response
// is that cluster state processing is blocked and not just that it takes
// time to process the indexing request
Thread.sleep(100);
assertThat(putMappingResponse.get(), equalTo(null));
assertThat(docIndexResponse.get(), equalTo(null));
// Now make sure the indexing request finishes successfully
disruption.stopDisrupting();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class));
PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get();
assertTrue(resp.isAcknowledged());
assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
IndexResponse docResp = (IndexResponse) docIndexResponse.get();
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
1, docResp.getShardInfo().getTotal());
}
});
}
public void testDelayedMappingPropagationOnReplica() throws Exception {
// This is essentially the same thing as testDelayedMappingPropagationOnPrimary
// but for replicas
// Here we want to test that everything goes well if the mappings that
// are needed for a document are not available on the replica at the
// time of indexing it