Merge pull request #10949 from jpountz/fix/wait_for_mappings_on_primary

Mappings: Wait for mappings to be available on the primary before indexing.
This commit is contained in:
Adrien Grand 2015-05-06 16:22:37 +02:00
commit 19a6cb246e
6 changed files with 247 additions and 106 deletions

View File

@ -75,7 +75,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
public static final String ACTION_NAME = BulkAction.NAME + "[s]";
private final MappingUpdatedAction mappingUpdatedAction;
private final UpdateHelper updateHelper;
private final boolean allowIdGeneration;
@ -83,9 +82,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction,
actionFilters,
BulkShardRequest.class, BulkShardRequest.class, ThreadPool.Names.BULK);
this.mappingUpdatedAction = mappingUpdatedAction;
this.updateHelper = updateHelper;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
}
@ -323,11 +322,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
static class WriteResult {
final ActionWriteResponse response;
final Engine.IndexingOperation op;
WriteResult(ActionWriteResponse response, Engine.IndexingOperation op) {
WriteResult(ActionWriteResponse response) {
this.response = response;
this.op = op;
}
@SuppressWarnings("unchecked")
@ -355,49 +352,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
indexRequest.process(clusterState.metaData(), mappingMd, allowIdGeneration, request.index());
}
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
final Engine.IndexingOperation operation;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
} else {
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
operation = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(indexRequest.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, indexRequest.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, indexRequest.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
// update the version on request so it will happen on the replicas
final long version = operation.version();
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.version(version);
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
IndexResponse indexResponse = new IndexResponse(request.index(), indexRequest.type(), indexRequest.id(), version, created);
return new WriteResult(indexResponse, operation);
final IndexResponse response = executeIndexRequestOnPrimary(request, indexRequest, indexShard);
return new WriteResult(response);
}
private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
@ -410,7 +366,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
DeleteResponse deleteResponse = new DeleteResponse(request.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.found());
return new WriteResult(deleteResponse, null);
return new WriteResult(deleteResponse);
}
static class UpdateResult {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
@ -56,8 +57,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
@Inject
public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
TransportCreateIndexAction createIndexAction, ActionFilters actionFilters,
MappingUpdatedAction mappingUpdatedAction) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
mappingUpdatedAction, actionFilters,
DeleteRequest.class, DeleteRequest.class, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.autoCreateIndex = new AutoCreateIndex(settings);

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
@ -38,19 +37,16 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -69,7 +65,6 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
private final AutoCreateIndex autoCreateIndex;
private final boolean allowIdGeneration;
private final TransportCreateIndexAction createIndexAction;
private final MappingUpdatedAction mappingUpdatedAction;
private final ClusterService clusterService;
@ -77,10 +72,9 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
TransportCreateIndexAction createIndexAction, MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters) {
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
IndexRequest.class, IndexRequest.class, ThreadPool.Names.INDEX);
super(settings, IndexAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction,
actionFilters, IndexRequest.class, IndexRequest.class, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.mappingUpdatedAction = mappingUpdatedAction;
this.autoCreateIndex = new AutoCreateIndex(settings);
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
this.clusterService = clusterService;
@ -171,40 +165,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.IndexingOperation operation;
if (request.opType() == IndexRequest.OpType.INDEX) {
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
operation = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
}
final boolean created;
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
final String indexName = indexService.index().name();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexService.mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
final IndexResponse response = executeIndexRequestOnPrimary(null, request, indexShard);
if (request.refresh()) {
try {
@ -214,13 +176,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
}
// update the version on the request, so it will be used for the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
return new Tuple<>(new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created), shardRequest.request);
return new Tuple<>(response, shardRequest.request);
}
@Override

View File

@ -25,12 +25,17 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.IndexRequest.OpType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -39,6 +44,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
@ -48,13 +54,18 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
@ -76,6 +87,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected final ShardStateAction shardStateAction;
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
protected final TransportRequestOptions transportOptions;
protected final MappingUpdatedAction mappingUpdatedAction;
final String transportReplicaAction;
final String executor;
@ -83,13 +95,15 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected TransportShardReplicationOperationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction, ActionFilters actionFilters,
ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
Class<Request> request, Class<ReplicaRequest> replicaRequest, String executor) {
super(settings, actionName, threadPool, actionFilters);
this.transportService = transportService;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.mappingUpdatedAction = mappingUpdatedAction;
this.transportReplicaAction = actionName + "[r]";
this.executor = executor;
@ -145,7 +159,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
protected boolean retryPrimaryException(Throwable e) {
return TransportActions.isShardNotAvailableException(e);
return e.getClass() == RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e);
}
/**
@ -293,6 +308,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
protected static class RetryOnPrimaryException extends IndexShardException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
super(shardId, msg);
}
public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
super(shardId, msg, cause);
}
}
/**
* Responsible for performing all operations up to the point we start starting sending requests to replica shards.
* Including forwarding the request to another node if the primary is not assigned locally.
@ -1001,4 +1027,66 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
}
/** Utility method to create either an index or a create operation depending
* on the {@link OpType} of the request. */
private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
boolean canHaveDuplicates = request.canHaveDuplicates();
if (shardRequest != null) {
canHaveDuplicates |= shardRequest.canHaveDuplicates();
}
if (request.opType() == IndexRequest.OpType.INDEX) {
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, canHaveDuplicates);
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
return indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, canHaveDuplicates, canHaveDuplicates);
}
}
/** Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. */
protected final IndexResponse executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final boolean created;
final ShardId shardId = indexShard.shardId();
if (update != null) {
final String indexName = shardId.getIndex();
if (indexName.equals(RiverIndexName.Conf.indexName(settings))) {
// With rivers, we have a chicken and egg problem if indexing
// the _meta document triggers a mapping update. Because we would
// like to validate the mapping update first, but on the other
// hand putting the mapping would start the river, which expects
// to find a _meta document
// So we have no choice but to index first and send mappings afterwards
MapperService mapperService = indexShard.indexService().mapperService();
mapperService.merge(request.type(), new CompressedString(update.toBytes()), true);
created = operation.execute(indexShard);
mappingUpdatedAction.updateMappingOnMasterAsynchronously(indexName, request.type(), update);
} else {
mappingUpdatedAction.updateMappingOnMasterSynchronously(indexName, request.type(), update);
operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
"Dynamics mappings are not available on the node that holds the primary yet");
}
created = operation.execute(indexShard);
}
} else {
created = operation.execute(indexShard);
}
// update the version on request so it will happen on the replicas
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
return new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.support.replication;
import com.google.common.base.Predicate;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -41,7 +42,13 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
@ -77,8 +84,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.hamcrest.Matchers.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ShardReplicationOperationTests extends ElasticsearchTestCase {
@ -687,7 +701,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
ClusterService clusterService,
ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, null, threadPool,
new ShardStateAction(settings, clusterService, transportService, null, null),
new ShardStateAction(settings, clusterService, transportService, null, null), null,
new ActionFilters(new HashSet<ActionFilter>()), Request.class, Request.class, ThreadPool.Names.SAME);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -35,7 +36,9 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
@ -49,10 +52,13 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -130,7 +136,125 @@ public class RareClusterStateTests extends ElasticsearchIntegrationTest {
assertHitCount(client().prepareSearch("test").get(), 0);
}
public void testDelayedMappingPropagationOnPrimary() throws Exception {
// Here we want to test that things go well if there is a first request
// that adds mappings but before mappings are propagated to all nodes
// another index request introduces the same mapping. The master node
// will reply immediately since it did not change the cluster state
// but the change might not be on the node that performed the indexing
// operation yet
Settings settings = ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "0ms").build();
final List<String> nodeNames = internalCluster().startNodesAsync(2, settings).get();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
final String master = internalCluster().getMasterName();
assertThat(nodeNames, hasItem(master));
String otherNode = null;
for (String node : nodeNames) {
if (node.equals(master) == false) {
otherNode = node;
break;
}
}
assertNotNull(otherNode);
// Don't allocate the shard on the master node
assertAcked(prepareCreate("index").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.exclude._name", master)).get());
ensureGreen();
// Check routing tables
ClusterState state = client().admin().cluster().prepareState().get().getState();
assertEquals(master, state.nodes().masterNode().name());
List<ShardRouting> shards = state.routingTable().allShards("index");
assertThat(shards, hasSize(1));
for (ShardRouting shard : shards) {
if (shard.primary()) {
// primary must not be on the master node
assertFalse(state.nodes().masterNodeId().equals(shard.currentNodeId()));
} else {
fail(); // only primaries
}
}
// Block cluster state processing where our shard is
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(otherNode, getRandom());
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
// Add a new mapping...
final AtomicReference<Object> putMappingResponse = new AtomicReference<>();
client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse response) {
putMappingResponse.set(response);
}
@Override
public void onFailure(Throwable e) {
putMappingResponse.set(e);
}
});
// ...and wait for mappings to be available on master
assertBusy(new Runnable() {
@Override
public void run() {
ImmutableOpenMap<String, MappingMetaData> indexMappings = client().admin().indices().prepareGetMappings("index").get().getMappings().get("index");
assertNotNull(indexMappings);
MappingMetaData typeMappings = indexMappings.get("type");
assertNotNull(typeMappings);
Object properties;
try {
properties = typeMappings.getSourceAsMap().get("properties");
} catch (IOException e) {
throw new AssertionError(e);
}
assertNotNull(properties);
Object fieldMapping = ((Map<String, Object>) properties).get("field");
assertNotNull(fieldMapping);
}
});
final AtomicReference<Object> docIndexResponse = new AtomicReference<>();
client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
docIndexResponse.set(response);
}
@Override
public void onFailure(Throwable e) {
docIndexResponse.set(e);
}
});
// Wait a bit to make sure that the reason why we did not get a response
// is that cluster state processing is blocked and not just that it takes
// time to process the indexing request
Thread.sleep(100);
assertThat(putMappingResponse.get(), equalTo(null));
assertThat(docIndexResponse.get(), equalTo(null));
// Now make sure the indexing request finishes successfully
disruption.stopDisrupting();
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class));
PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get();
assertTrue(resp.isAcknowledged());
assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
IndexResponse docResp = (IndexResponse) docIndexResponse.get();
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
1, docResp.getShardInfo().getTotal());
}
});
}
public void testDelayedMappingPropagationOnReplica() throws Exception {
// This is essentially the same thing as testDelayedMappingPropagationOnPrimary
// but for replicas
// Here we want to test that everything goes well if the mappings that
// are needed for a document are not available on the replica at the
// time of indexing it