From 3e5b8a21b4d2e8a85c4037dd2296b7f6861b0a46 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Thu, 23 Apr 2015 19:05:31 +0200 Subject: [PATCH] Internal: Wait for required mappings to be available on the replica before indexing. Due to timing issues, mappings that are required to index a document might not be available on the replica at indexing time. In that case the replica starts listening to cluster state changes and re-parses the document until no dynamic mappings updates are generated. --- .../action/ActionWriteResponse.java | 8 ++ .../action/bulk/TransportShardBulkAction.java | 28 ++-- .../action/index/IndexResponse.java | 13 ++ .../action/index/TransportIndexAction.java | 35 ++--- ...nsportShardReplicationOperationAction.java | 103 ++++++++++++-- .../elasticsearch/cluster/ClusterService.java | 5 +- .../cluster/ClusterStateObserver.java | 43 +++--- .../service/InternalClusterService.java | 10 +- .../org/elasticsearch/common/Strings.java | 32 ++++- .../discovery/zen/ZenDiscovery.java | 6 +- .../discovery/zen/ZenDiscoveryUnitTest.java | 12 +- .../indices/state/RareClusterStateTests.java | 129 ++++++++++++++++++ 12 files changed, 338 insertions(+), 86 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/ActionWriteResponse.java b/src/main/java/org/elasticsearch/action/ActionWriteResponse.java index 5ddefaf99b2..11240cc2cbc 100644 --- a/src/main/java/org/elasticsearch/action/ActionWriteResponse.java +++ b/src/main/java/org/elasticsearch/action/ActionWriteResponse.java @@ -23,15 +23,18 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.bootstrap.Elasticsearch; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import java.util.Collections; /** * Base class for write action responses. @@ -156,6 +159,11 @@ public abstract class ActionWriteResponse extends ActionResponse { return builder; } + @Override + public String toString() { + return Strings.toString(this); + } + public static ShardInfo readShardInfo(StreamInput in) throws IOException { ShardInfo shardInfo = new ShardInfo(); shardInfo.readFrom(in); diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 6fac03ac5c9..d5009544a47 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -291,6 +291,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } } + } else { + throw new ElasticsearchIllegalStateException("Unexpected index operation: " + item.request()); } assert item.getPrimaryResponse() != null; @@ -532,7 +534,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation @Override - protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) throws Exception { + protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardId.id()); for (int i = 0; i < request.items().length; i++) { @@ -548,28 +550,18 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation if (indexRequest.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); - if (index.parsedDoc().dynamicMappingsUpdate() != null) { - if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { - // mappings updates on the _river are not validated synchronously so we can't - // assume they are here when indexing on a replica - indexService.mapperService().merge(indexRequest.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true); - } else { - throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]"); - } + Mapping update = index.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); } indexShard.index(index); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); - if (create.parsedDoc().dynamicMappingsUpdate() != null) { - if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { - // mappings updates on the _river are not validated synchronously so we can't - // assume they are here when indexing on a replica - indexService.mapperService().merge(indexRequest.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true); - } else { - throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]"); - } + Mapping update = create.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); } indexShard.create(create); } @@ -592,6 +584,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation throw e; } } + } else { + throw new ElasticsearchIllegalStateException("Unexpected index operation: " + item.request()); } } diff --git a/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 0074d87b563..5727b2b673b 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -105,4 +105,17 @@ public class IndexResponse extends ActionWriteResponse { out.writeLong(version); out.writeBoolean(created); } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("IndexResponse["); + builder.append("index=").append(index); + builder.append(",type=").append(type); + builder.append(",id=").append(id); + builder.append(",version=").append(version); + builder.append(",created=").append(created); + builder.append(",shards=").append(getShardInfo()); + return builder.append("]").toString(); + } } diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 5a8c96f352c..2fd801c025d 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.index; -import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; @@ -54,8 +53,6 @@ import org.elasticsearch.river.RiverIndexName; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; - /** * Performs the index operation. *

@@ -73,6 +70,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi private final TransportCreateIndexAction createIndexAction; private final MappingUpdatedAction mappingUpdatedAction; + private final ClusterService clusterService; + @Inject public TransportIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, @@ -83,6 +82,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi this.mappingUpdatedAction = mappingUpdatedAction; this.autoCreateIndex = new AutoCreateIndex(settings); this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); + this.clusterService = clusterService; } @Override @@ -201,6 +201,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi version = index.version(); created = index.created(); } else { + assert request.opType() == IndexRequest.OpType.CREATE : request.opType(); Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId()); Mapping update = create.parsedDoc().dynamicMappingsUpdate(); @@ -244,34 +245,24 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } @Override - protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) throws IOException { + protected void shardOperationOnReplica(ShardId shardId, IndexRequest request) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardId.id()); SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates()); - if (index.parsedDoc().dynamicMappingsUpdate() != null) { - if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { - // mappings updates on the _river are not validated synchronously so we can't - // assume they are here when indexing on a replica - indexService.mapperService().merge(request.type(), new CompressedString(index.parsedDoc().dynamicMappingsUpdate().toBytes()), true); - } else { - throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + index.parsedDoc().dynamicMappingsUpdate() + "]"); - } + Mapping update = index.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); } indexShard.index(index); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse, - request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId()); - if (create.parsedDoc().dynamicMappingsUpdate() != null) { - if (indexService.index().name().equals(RiverIndexName.Conf.indexName(settings))) { - // mappings updates on the _river are not validated synchronously so we can't - // assume they are here when indexing on a replica - indexService.mapperService().merge(request.type(), new CompressedString(create.parsedDoc().dynamicMappingsUpdate().toBytes()), true); - } else { - throw new ElasticsearchIllegalStateException("Index operations on replicas should not trigger dynamic mappings updates: [" + create.parsedDoc().dynamicMappingsUpdate() + "]"); - } + assert request.opType() == IndexRequest.OpType.CREATE : request.opType(); + Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId()); + Mapping update = create.parsedDoc().dynamicMappingsUpdate(); + if (update != null) { + throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update); } indexShard.create(create); } diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index bde8a67b098..ce26311d5fa 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -21,10 +21,11 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; -import org.elasticsearch.action.*; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterService; @@ -35,11 +36,13 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -48,12 +51,21 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.BaseTransportResponseHandler; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.EmptyTransportResponseHandler; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Map; @@ -112,7 +124,7 @@ public abstract class TransportShardReplicationOperationAction shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable; - protected abstract void shardOperationOnReplica(ShardId shardId, ReplicaRequest shardRequest) throws Exception; + protected abstract void shardOperationOnReplica(ShardId shardId, ReplicaRequest shardRequest); protected abstract ShardIterator shards(ClusterState clusterState, InternalRequest request) throws ElasticsearchException; @@ -203,12 +215,77 @@ public abstract class TransportShardReplicationOperationAction { @Override public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception { - try { - shardOperationOnReplica(request.internalShardId, request); - } catch (Throwable t) { - failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t); - throw t; + new AsyncReplicaAction(request, channel).run(); + } + } + + protected static class RetryOnReplicaException extends IndexShardException { + + public RetryOnReplicaException(ShardId shardId, String msg) { + super(shardId, msg); + } + + public RetryOnReplicaException(ShardId shardId, String msg, Throwable cause) { + super(shardId, msg, cause); + } + } + + private final class AsyncReplicaAction extends AbstractRunnable { + private final ReplicaRequest request; + private final TransportChannel channel; + // important: we pass null as a timeout as failing a replica is + // something we want to avoid at all costs + private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger); + + + AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) { + this.request = request; + this.channel = channel; + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof RetryOnReplicaException) { + logger.trace("Retrying operation on replica", t); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + threadPool.executor(executor).execute(AsyncReplicaAction.this); + } + + @Override + public void onClusterServiceClose() { + responseWithFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + throw new AssertionError("Cannot happen: there is not timeout"); + } + }); + } else { + try { + failReplicaIfNeeded(request.internalShardId.getIndex(), request.internalShardId.id(), t); + } catch (Throwable unexpected) { + logger.error("{} unexpected error while failing replica", request.internalShardId.id(), unexpected); + } finally { + responseWithFailure(t); + } } + } + + protected void responseWithFailure(Throwable t) { + try { + channel.sendResponse(t); + } catch (IOException responseException) { + logger.warn("failed to send error message back to client for action [" + transportReplicaAction + "]", responseException); + logger.warn("actual Exception", t); + } + } + + @Override + protected void doRun() throws Exception { + shardOperationOnReplica(request.internalShardId, request); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } diff --git a/src/main/java/org/elasticsearch/cluster/ClusterService.java b/src/main/java/org/elasticsearch/cluster/ClusterService.java index f456edd8ddb..805419ccc99 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.service.PendingClusterTask; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.unit.TimeValue; @@ -95,8 +96,10 @@ public interface ClusterService extends LifecycleComponent { * Adds a cluster state listener that will timeout after the provided timeout, * and is executed after the clusterstate has been successfully applied ie. is * in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED} + * NOTE: a {@code null} timeout means that the listener will never be removed + * automatically */ - void add(TimeValue timeout, TimeoutClusterStateListener listener); + void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener); /** * Submits a task that will update the cluster state. diff --git a/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java index a8909636932..28df5aa8017 100644 --- a/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java +++ b/src/main/java/org/elasticsearch/cluster/ClusterStateObserver.java @@ -42,18 +42,18 @@ public class ClusterStateObserver { return changedEvent.previousState().version() != changedEvent.state().version(); } }; - private ClusterService clusterService; + + private final ClusterService clusterService; volatile TimeValue timeOutValue; final AtomicReference lastObservedState; + final TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener(); // observingContext is not null when waiting on cluster state changes final AtomicReference observingContext = new AtomicReference(null); - volatile long startTime; + volatile Long startTime; volatile boolean timedOut; - volatile TimeoutClusterStateListener clusterStateListener = new ObserverClusterStateListener(); - public ClusterStateObserver(ClusterService clusterService, ESLogger logger) { this(clusterService, new TimeValue(60000), logger); @@ -65,10 +65,12 @@ public class ClusterStateObserver { * will fail any existing or new #waitForNextChange calls. */ public ClusterStateObserver(ClusterService clusterService, TimeValue timeout, ESLogger logger) { - this.timeOutValue = timeout; this.clusterService = clusterService; this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state())); - this.startTime = System.currentTimeMillis(); + this.timeOutValue = timeout; + if (timeOutValue != null) { + this.startTime = System.currentTimeMillis(); + } this.logger = logger; } @@ -108,19 +110,24 @@ public class ClusterStateObserver { if (observingContext.get() != null) { throw new ElasticsearchException("already waiting for a cluster state change"); } - long timeoutTimeLeft; + + Long timeoutTimeLeft; if (timeOutValue == null) { timeOutValue = this.timeOutValue; - long timeSinceStart = System.currentTimeMillis() - startTime; - timeoutTimeLeft = timeOutValue.millis() - timeSinceStart; - if (timeoutTimeLeft <= 0l) { - // things have timeout while we were busy -> notify - logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart)); - // update to latest, in case people want to retry - timedOut = true; - lastObservedState.set(new ObservedState(clusterService.state())); - listener.onTimeout(timeOutValue); - return; + if (timeOutValue != null) { + long timeSinceStart = System.currentTimeMillis() - startTime; + timeoutTimeLeft = timeOutValue.millis() - timeSinceStart; + if (timeoutTimeLeft <= 0l) { + // things have timeout while we were busy -> notify + logger.debug("observer timed out. notifying listener. timeout setting [{}], time since start [{}]", timeOutValue, new TimeValue(timeSinceStart)); + // update to latest, in case people want to retry + timedOut = true; + lastObservedState.set(new ObservedState(clusterService.state())); + listener.onTimeout(timeOutValue); + return; + } + } else { + timeoutTimeLeft = null; } } else { this.startTime = System.currentTimeMillis(); @@ -143,7 +150,7 @@ public class ClusterStateObserver { if (!observingContext.compareAndSet(null, context)) { throw new ElasticsearchException("already waiting for a cluster state change"); } - clusterService.add(new TimeValue(timeoutTimeLeft), clusterStateListener); + clusterService.add(timeoutTimeLeft == null ? null : new TimeValue(timeoutTimeLeft), clusterStateListener); } } diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index ea6866c420f..eb527e07fe4 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -230,7 +230,7 @@ public class InternalClusterService extends AbstractLifecycleComponent implemen static class ProcessClusterState { final ClusterState clusterState; - final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed; volatile boolean processed; - ProcessClusterState(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) { + ProcessClusterState(ClusterState clusterState) { this.clusterState = clusterState; - this.newStateProcessed = newStateProcessed; } } @@ -738,7 +736,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster")); } else { - final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed); + final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState); processNewClusterStates.add(processClusterState); assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master"; diff --git a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java index 4d7811fa023..b276f5787a9 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java @@ -105,7 +105,7 @@ public class ZenDiscoveryUnitTest extends ElasticsearchTestCase { int numUpdates = scaledRandomIntBetween(50, 100); LinkedList queue = new LinkedList<>(); for (int i = 0; i < numUpdates; i++) { - queue.add(new ProcessClusterState(ClusterState.builder(clusterName).version(i).nodes(nodes).build(), null)); + queue.add(new ProcessClusterState(ClusterState.builder(clusterName).version(i).nodes(nodes).build())); } ProcessClusterState mostRecent = queue.get(numUpdates - 1); Collections.shuffle(queue, getRandom()); @@ -121,15 +121,15 @@ public class ZenDiscoveryUnitTest extends ElasticsearchTestCase { DiscoveryNodes nodes2 = DiscoveryNodes.builder().masterNodeId("b").build(); LinkedList queue = new LinkedList<>(); - ProcessClusterState thirdMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(1).nodes(nodes1).build(), null); + ProcessClusterState thirdMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(1).nodes(nodes1).build()); queue.offer(thirdMostRecent); - ProcessClusterState secondMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(2).nodes(nodes1).build(), null); + ProcessClusterState secondMostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(2).nodes(nodes1).build()); queue.offer(secondMostRecent); - ProcessClusterState mostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(3).nodes(nodes1).build(), null); + ProcessClusterState mostRecent = new ProcessClusterState(ClusterState.builder(clusterName).version(3).nodes(nodes1).build()); queue.offer(mostRecent); Collections.shuffle(queue, getRandom()); - queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(4).nodes(nodes2).build(), null)); - queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(5).nodes(nodes1).build(), null)); + queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(4).nodes(nodes2).build())); + queue.offer(new ProcessClusterState(ClusterState.builder(clusterName).version(5).nodes(nodes1).build())); assertThat(ZenDiscovery.selectNextStateToProcess(queue), sameInstance(mostRecent.clusterState)); diff --git a/src/test/java/org/elasticsearch/indices/state/RareClusterStateTests.java b/src/test/java/org/elasticsearch/indices/state/RareClusterStateTests.java index 2875a90824d..a46a273b2ba 100644 --- a/src/test/java/org/elasticsearch/indices/state/RareClusterStateTests.java +++ b/src/test/java/org/elasticsearch/indices/state/RareClusterStateTests.java @@ -20,6 +20,10 @@ package org.elasticsearch.indices.state; import com.google.common.collect.ImmutableMap; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; @@ -27,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -35,6 +40,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -42,10 +51,16 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; /** */ @@ -115,4 +130,118 @@ public class RareClusterStateTests extends ElasticsearchIntegrationTest { assertHitCount(client().prepareSearch("test").get(), 0); } + public void testDelayedMappingPropagationOnReplica() throws Exception { + // Here we want to test that everything goes well if the mappings that + // are needed for a document are not available on the replica at the + // time of indexing it + final List nodeNames = internalCluster().startNodesAsync(2).get(); + assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); + + final String master = internalCluster().getMasterName(); + assertThat(nodeNames, hasItem(master)); + String otherNode = null; + for (String node : nodeNames) { + if (node.equals(master) == false) { + otherNode = node; + break; + } + } + assertNotNull(otherNode); + + // Force allocation of the primary on the master node by first only allocating on the master + // and then allowing all nodes so that the replica gets allocated on the other node + assertAcked(prepareCreate("index").setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", master)).get()); + assertAcked(client().admin().indices().prepareUpdateSettings("index").setSettings(ImmutableSettings.builder() + .put("index.routing.allocation.include._name", "")).get()); + ensureGreen(); + + // Check routing tables + ClusterState state = client().admin().cluster().prepareState().get().getState(); + assertEquals(master, state.nodes().masterNode().name()); + List shards = state.routingTable().allShards("index"); + assertThat(shards, hasSize(2)); + for (ShardRouting shard : shards) { + if (shard.primary()) { + // primary must be on the master + assertEquals(state.nodes().masterNodeId(), shard.currentNodeId()); + } else { + assertTrue(shard.active()); + } + } + + // Block cluster state processing on the replica + BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(otherNode, getRandom()); + internalCluster().setDisruptionScheme(disruption); + disruption.startDisrupting(); + final AtomicReference putMappingResponse = new AtomicReference<>(); + client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(new ActionListener() { + @Override + public void onResponse(PutMappingResponse response) { + putMappingResponse.set(response); + } + @Override + public void onFailure(Throwable e) { + putMappingResponse.set(e); + } + }); + // Wait for mappings to be available on master + assertBusy(new Runnable() { + @Override + public void run() { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master); + final IndexService indexService = indicesService.indexServiceSafe("index"); + assertNotNull(indexService); + final MapperService mapperService = indexService.mapperService(); + DocumentMapper mapper = mapperService.documentMapper("type"); + assertNotNull(mapper); + assertNotNull(mapper.mappers().getMapper("field")); + } + }); + + final AtomicReference docIndexResponse = new AtomicReference<>(); + client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener() { + @Override + public void onResponse(IndexResponse response) { + docIndexResponse.set(response); + } + @Override + public void onFailure(Throwable e) { + docIndexResponse.set(e); + } + }); + + // Wait for document to be indexed on primary + assertBusy(new Runnable() { + @Override + public void run() { + assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists()); + } + }); + + // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed + // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled + // and not just because it takes time to replicate the indexing request to the replica + Thread.sleep(100); + assertThat(putMappingResponse.get(), equalTo(null)); + assertThat(docIndexResponse.get(), equalTo(null)); + + // Now make sure the indexing request finishes successfully + disruption.stopDisrupting(); + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(putMappingResponse.get(), instanceOf(PutMappingResponse.class)); + PutMappingResponse resp = (PutMappingResponse) putMappingResponse.get(); + assertTrue(resp.isAcknowledged()); + assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); + IndexResponse docResp = (IndexResponse) docIndexResponse.get(); + assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), + 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded + } + }); + } + }